awsprovider.go 69 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "os"
  12. "regexp"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "k8s.io/klog"
  18. "github.com/kubecost/cost-model/pkg/clustercache"
  19. "github.com/kubecost/cost-model/pkg/env"
  20. "github.com/kubecost/cost-model/pkg/errors"
  21. "github.com/kubecost/cost-model/pkg/log"
  22. "github.com/kubecost/cost-model/pkg/util"
  23. "github.com/aws/aws-sdk-go/aws"
  24. "github.com/aws/aws-sdk-go/aws/awserr"
  25. "github.com/aws/aws-sdk-go/aws/credentials"
  26. "github.com/aws/aws-sdk-go/aws/credentials/stscreds"
  27. "github.com/aws/aws-sdk-go/aws/session"
  28. "github.com/aws/aws-sdk-go/service/athena"
  29. "github.com/aws/aws-sdk-go/service/ec2"
  30. "github.com/aws/aws-sdk-go/service/s3"
  31. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  32. "github.com/jszwec/csvutil"
  33. v1 "k8s.io/api/core/v1"
  34. )
  35. const awsReservedInstancePricePerHour = 0.0287
  36. const supportedSpotFeedVersion = "1"
  37. const SpotInfoUpdateType = "spotinfo"
  38. const AthenaInfoUpdateType = "athenainfo"
  39. // How often spot data is refreshed
  40. const SpotRefreshDuration = 15 * time.Minute
  41. const defaultConfigPath = "/var/configs/"
  42. var awsRegions = []string{
  43. "us-east-2",
  44. "us-east-1",
  45. "us-west-1",
  46. "us-west-2",
  47. "ap-east-1",
  48. "ap-south-1",
  49. "ap-northeast-3",
  50. "ap-northeast-2",
  51. "ap-southeast-1",
  52. "ap-southeast-2",
  53. "ap-northeast-1",
  54. "ca-central-1",
  55. "cn-north-1",
  56. "cn-northwest-1",
  57. "eu-central-1",
  58. "eu-west-1",
  59. "eu-west-2",
  60. "eu-west-3",
  61. "eu-north-1",
  62. "me-south-1",
  63. "sa-east-1",
  64. "us-gov-east-1",
  65. "us-gov-west-1",
  66. }
  67. // AWS represents an Amazon Provider
  68. type AWS struct {
  69. Pricing map[string]*AWSProductTerms
  70. SpotPricingByInstanceID map[string]*spotInfo
  71. SpotPricingUpdatedAt *time.Time
  72. SpotRefreshRunning bool
  73. SpotPricingLock sync.RWMutex
  74. RIPricingByInstanceID map[string]*RIData
  75. RIDataRunning bool
  76. RIDataLock sync.RWMutex
  77. SavingsPlanDataByInstanceID map[string]*SavingsPlanData
  78. SavingsPlanDataRunning bool
  79. SavingsPlanDataLock sync.RWMutex
  80. ValidPricingKeys map[string]bool
  81. Clientset clustercache.ClusterCache
  82. BaseCPUPrice string
  83. BaseRAMPrice string
  84. BaseGPUPrice string
  85. BaseSpotCPUPrice string
  86. BaseSpotRAMPrice string
  87. SpotLabelName string
  88. SpotLabelValue string
  89. ServiceKeyName string
  90. ServiceKeySecret string
  91. SpotDataRegion string
  92. SpotDataBucket string
  93. SpotDataPrefix string
  94. ProjectID string
  95. DownloadPricingDataLock sync.RWMutex
  96. Config *ProviderConfig
  97. ServiceAccountChecks map[string]*ServiceAccountCheck
  98. *CustomProvider
  99. }
  100. type AWSAccessKey struct {
  101. AccessKeyID string `json:"aws_access_key_id"`
  102. SecretAccessKey string `json:"aws_secret_access_key"`
  103. }
  104. // AWSPricing maps a k8s node to an AWS Pricing "product"
  105. type AWSPricing struct {
  106. Products map[string]*AWSProduct `json:"products"`
  107. Terms AWSPricingTerms `json:"terms"`
  108. }
  109. // AWSProduct represents a purchased SKU
  110. type AWSProduct struct {
  111. Sku string `json:"sku"`
  112. Attributes AWSProductAttributes `json:"attributes"`
  113. }
  114. // AWSProductAttributes represents metadata about the product used to map to a node.
  115. type AWSProductAttributes struct {
  116. Location string `json:"location"`
  117. InstanceType string `json:"instanceType"`
  118. Memory string `json:"memory"`
  119. Storage string `json:"storage"`
  120. VCpu string `json:"vcpu"`
  121. UsageType string `json:"usagetype"`
  122. OperatingSystem string `json:"operatingSystem"`
  123. PreInstalledSw string `json:"preInstalledSw"`
  124. InstanceFamily string `json:"instanceFamily"`
  125. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  126. }
  127. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  128. type AWSPricingTerms struct {
  129. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  130. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  131. }
  132. // AWSOfferTerm is a sku extension used to pay for the node.
  133. type AWSOfferTerm struct {
  134. Sku string `json:"sku"`
  135. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  136. }
  137. // AWSRateCode encodes data about the price of a product
  138. type AWSRateCode struct {
  139. Unit string `json:"unit"`
  140. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  141. }
  142. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  143. type AWSCurrencyCode struct {
  144. USD string `json:"USD"`
  145. }
  146. // AWSProductTerms represents the full terms of the product
  147. type AWSProductTerms struct {
  148. Sku string `json:"sku"`
  149. OnDemand *AWSOfferTerm `json:"OnDemand"`
  150. Reserved *AWSOfferTerm `json:"Reserved"`
  151. Memory string `json:"memory"`
  152. Storage string `json:"storage"`
  153. VCpu string `json:"vcpu"`
  154. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  155. PV *PV `json:"pv"`
  156. }
  157. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  158. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  159. // OnDemandRateCode is appended to an node sku
  160. const OnDemandRateCode = ".JRTCKXETXF"
  161. // ReservedRateCode is appended to a node sku
  162. const ReservedRateCode = ".38NPMPTW36"
  163. // HourlyRateCode is appended to a node sku
  164. const HourlyRateCode = ".6YS6EN2CT7"
  165. // volTypes are used to map between AWS UsageTypes and
  166. // EBS volume types, as they would appear in K8s storage class
  167. // name and the EC2 API.
  168. var volTypes = map[string]string{
  169. "EBS:VolumeUsage.gp2": "gp2",
  170. "EBS:VolumeUsage": "standard",
  171. "EBS:VolumeUsage.sc1": "sc1",
  172. "EBS:VolumeP-IOPS.piops": "io1",
  173. "EBS:VolumeUsage.st1": "st1",
  174. "EBS:VolumeUsage.piops": "io1",
  175. "gp2": "EBS:VolumeUsage.gp2",
  176. "standard": "EBS:VolumeUsage",
  177. "sc1": "EBS:VolumeUsage.sc1",
  178. "io1": "EBS:VolumeUsage.piops",
  179. "st1": "EBS:VolumeUsage.st1",
  180. }
  181. // locationToRegion maps AWS region names (As they come from Billing)
  182. // to actual region identifiers
  183. var locationToRegion = map[string]string{
  184. "US East (Ohio)": "us-east-2",
  185. "US East (N. Virginia)": "us-east-1",
  186. "US West (N. California)": "us-west-1",
  187. "US West (Oregon)": "us-west-2",
  188. "Asia Pacific (Hong Kong)": "ap-east-1",
  189. "Asia Pacific (Mumbai)": "ap-south-1",
  190. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  191. "Asia Pacific (Seoul)": "ap-northeast-2",
  192. "Asia Pacific (Singapore)": "ap-southeast-1",
  193. "Asia Pacific (Sydney)": "ap-southeast-2",
  194. "Asia Pacific (Tokyo)": "ap-northeast-1",
  195. "Canada (Central)": "ca-central-1",
  196. "China (Beijing)": "cn-north-1",
  197. "China (Ningxia)": "cn-northwest-1",
  198. "EU (Frankfurt)": "eu-central-1",
  199. "EU (Ireland)": "eu-west-1",
  200. "EU (London)": "eu-west-2",
  201. "EU (Paris)": "eu-west-3",
  202. "EU (Stockholm)": "eu-north-1",
  203. "South America (Sao Paulo)": "sa-east-1",
  204. "AWS GovCloud (US-East)": "us-gov-east-1",
  205. "AWS GovCloud (US)": "us-gov-west-1",
  206. }
  207. var regionToBillingRegionCode = map[string]string{
  208. "us-east-2": "USE2",
  209. "us-east-1": "",
  210. "us-west-1": "USW1",
  211. "us-west-2": "USW2",
  212. "ap-east-1": "APE1",
  213. "ap-south-1": "APS3",
  214. "ap-northeast-3": "APN3",
  215. "ap-northeast-2": "APN2",
  216. "ap-southeast-1": "APS1",
  217. "ap-southeast-2": "APS2",
  218. "ap-northeast-1": "APN1",
  219. "ca-central-1": "CAN1",
  220. "cn-north-1": "",
  221. "cn-northwest-1": "",
  222. "eu-central-1": "EUC1",
  223. "eu-west-1": "EU",
  224. "eu-west-2": "EUW2",
  225. "eu-west-3": "EUW3",
  226. "eu-north-1": "EUN1",
  227. "sa-east-1": "SAE1",
  228. "us-gov-east-1": "UGE1",
  229. "us-gov-west-1": "UGW1",
  230. }
  231. var loadedAWSSecret bool = false
  232. var awsSecret *AWSAccessKey = nil
  233. func (aws *AWS) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
  234. return ""
  235. }
  236. // KubeAttrConversion maps the k8s labels for region to an aws region
  237. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  238. operatingSystem = strings.ToLower(operatingSystem)
  239. region := locationToRegion[location]
  240. return region + "," + instanceType + "," + operatingSystem
  241. }
  242. type AwsSpotFeedInfo struct {
  243. BucketName string `json:"bucketName"`
  244. Prefix string `json:"prefix"`
  245. Region string `json:"region"`
  246. AccountID string `json:"projectID"`
  247. ServiceKeyName string `json:"serviceKeyName"`
  248. ServiceKeySecret string `json:"serviceKeySecret"`
  249. SpotLabel string `json:"spotLabel"`
  250. SpotLabelValue string `json:"spotLabelValue"`
  251. }
  252. type AwsAthenaInfo struct {
  253. AthenaBucketName string `json:"athenaBucketName"`
  254. AthenaRegion string `json:"athenaRegion"`
  255. AthenaDatabase string `json:"athenaDatabase"`
  256. AthenaTable string `json:"athenaTable"`
  257. ServiceKeyName string `json:"serviceKeyName"`
  258. ServiceKeySecret string `json:"serviceKeySecret"`
  259. AccountID string `json:"projectID"`
  260. MasterPayerARN string `json:"masterPayerARN"`
  261. }
  262. func (aws *AWS) GetManagementPlatform() (string, error) {
  263. nodes := aws.Clientset.GetAllNodes()
  264. if len(nodes) > 0 {
  265. n := nodes[0]
  266. version := n.Status.NodeInfo.KubeletVersion
  267. if strings.Contains(version, "eks") {
  268. return "eks", nil
  269. }
  270. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  271. return "kops", nil
  272. }
  273. }
  274. return "", nil
  275. }
  276. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  277. c, err := aws.Config.GetCustomPricingData()
  278. if c.Discount == "" {
  279. c.Discount = "0%"
  280. }
  281. if c.NegotiatedDiscount == "" {
  282. c.NegotiatedDiscount = "0%"
  283. }
  284. if err != nil {
  285. return nil, err
  286. }
  287. return c, nil
  288. }
  289. func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
  290. return aws.Config.UpdateFromMap(a)
  291. }
  292. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  293. return aws.Config.Update(func(c *CustomPricing) error {
  294. if updateType == SpotInfoUpdateType {
  295. a := AwsSpotFeedInfo{}
  296. err := json.NewDecoder(r).Decode(&a)
  297. if err != nil {
  298. return err
  299. }
  300. c.ServiceKeyName = a.ServiceKeyName
  301. if a.ServiceKeySecret != "" {
  302. c.ServiceKeySecret = a.ServiceKeySecret
  303. }
  304. c.SpotDataPrefix = a.Prefix
  305. c.SpotDataBucket = a.BucketName
  306. c.ProjectID = a.AccountID
  307. c.SpotDataRegion = a.Region
  308. c.SpotLabel = a.SpotLabel
  309. c.SpotLabelValue = a.SpotLabelValue
  310. } else if updateType == AthenaInfoUpdateType {
  311. a := AwsAthenaInfo{}
  312. err := json.NewDecoder(r).Decode(&a)
  313. if err != nil {
  314. return err
  315. }
  316. c.AthenaBucketName = a.AthenaBucketName
  317. c.AthenaRegion = a.AthenaRegion
  318. c.AthenaDatabase = a.AthenaDatabase
  319. c.AthenaTable = a.AthenaTable
  320. c.ServiceKeyName = a.ServiceKeyName
  321. if a.ServiceKeySecret != "" {
  322. c.ServiceKeySecret = a.ServiceKeySecret
  323. }
  324. if a.MasterPayerARN != "" {
  325. c.MasterPayerARN = a.MasterPayerARN
  326. }
  327. c.AthenaProjectID = a.AccountID
  328. } else {
  329. a := make(map[string]interface{})
  330. err := json.NewDecoder(r).Decode(&a)
  331. if err != nil {
  332. return err
  333. }
  334. for k, v := range a {
  335. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  336. vstr, ok := v.(string)
  337. if ok {
  338. err := SetCustomPricingField(c, kUpper, vstr)
  339. if err != nil {
  340. return err
  341. }
  342. } else {
  343. sci := v.(map[string]interface{})
  344. sc := make(map[string]string)
  345. for k, val := range sci {
  346. sc[k] = val.(string)
  347. }
  348. c.SharedCosts = sc //todo: support reflection/multiple map fields
  349. }
  350. }
  351. }
  352. if env.IsRemoteEnabled() {
  353. err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
  354. if err != nil {
  355. return err
  356. }
  357. }
  358. return nil
  359. })
  360. }
  361. type awsKey struct {
  362. SpotLabelName string
  363. SpotLabelValue string
  364. Labels map[string]string
  365. ProviderID string
  366. }
  367. func (k *awsKey) GPUType() string {
  368. return ""
  369. }
  370. func (k *awsKey) ID() string {
  371. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  372. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  373. if matchNum == 2 {
  374. return group
  375. }
  376. }
  377. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  378. return ""
  379. }
  380. func (k *awsKey) Features() string {
  381. instanceType := k.Labels[v1.LabelInstanceType]
  382. var operatingSystem string
  383. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  384. if !ok {
  385. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  386. }
  387. region := k.Labels[v1.LabelZoneRegion]
  388. key := region + "," + instanceType + "," + operatingSystem
  389. usageType := "preemptible"
  390. spotKey := key + "," + usageType
  391. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  392. return spotKey
  393. }
  394. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  395. return spotKey
  396. }
  397. return key
  398. }
  399. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  400. pricing, ok := aws.Pricing[pvk.Features()]
  401. if !ok {
  402. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  403. return &PV{}, nil
  404. }
  405. return pricing.PV, nil
  406. }
  407. type awsPVKey struct {
  408. Labels map[string]string
  409. StorageClassParameters map[string]string
  410. StorageClassName string
  411. Name string
  412. DefaultRegion string
  413. }
  414. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  415. return &awsPVKey{
  416. Labels: pv.Labels,
  417. StorageClassName: pv.Spec.StorageClassName,
  418. StorageClassParameters: parameters,
  419. Name: pv.Name,
  420. DefaultRegion: defaultRegion,
  421. }
  422. }
  423. func (key *awsPVKey) GetStorageClass() string {
  424. return key.StorageClassName
  425. }
  426. func (key *awsPVKey) Features() string {
  427. storageClass := key.StorageClassParameters["type"]
  428. if storageClass == "standard" {
  429. storageClass = "gp2"
  430. }
  431. // Storage class names are generally EBS volume types (gp2)
  432. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  433. // Converts between the 2
  434. region := key.Labels[v1.LabelZoneRegion]
  435. //if region == "" {
  436. // region = "us-east-1"
  437. //}
  438. class, ok := volTypes[storageClass]
  439. if !ok {
  440. klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  441. }
  442. return region + "," + class
  443. }
  444. // GetKey maps node labels to information needed to retrieve pricing data
  445. func (aws *AWS) GetKey(labels map[string]string, n *v1.Node) Key {
  446. return &awsKey{
  447. SpotLabelName: aws.SpotLabelName,
  448. SpotLabelValue: aws.SpotLabelValue,
  449. Labels: labels,
  450. ProviderID: labels["providerID"],
  451. }
  452. }
  453. func (aws *AWS) isPreemptible(key string) bool {
  454. s := strings.Split(key, ",")
  455. if len(s) == 4 && s[3] == "preemptible" {
  456. return true
  457. }
  458. return false
  459. }
  460. // DownloadPricingData fetches data from the AWS Pricing API
  461. func (aws *AWS) DownloadPricingData() error {
  462. aws.DownloadPricingDataLock.Lock()
  463. defer aws.DownloadPricingDataLock.Unlock()
  464. if aws.ServiceAccountChecks == nil {
  465. aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  466. }
  467. c, err := aws.Config.GetCustomPricingData()
  468. if err != nil {
  469. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  470. }
  471. aws.BaseCPUPrice = c.CPU
  472. aws.BaseRAMPrice = c.RAM
  473. aws.BaseGPUPrice = c.GPU
  474. aws.BaseSpotCPUPrice = c.SpotCPU
  475. aws.BaseSpotRAMPrice = c.SpotRAM
  476. aws.SpotLabelName = c.SpotLabel
  477. aws.SpotLabelValue = c.SpotLabelValue
  478. aws.SpotDataBucket = c.SpotDataBucket
  479. aws.SpotDataPrefix = c.SpotDataPrefix
  480. aws.ProjectID = c.ProjectID
  481. aws.SpotDataRegion = c.SpotDataRegion
  482. skn, sks := aws.getAWSAuth(false, c)
  483. aws.ServiceKeyName = skn
  484. aws.ServiceKeySecret = sks
  485. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  486. klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  487. }
  488. nodeList := aws.Clientset.GetAllNodes()
  489. inputkeys := make(map[string]bool)
  490. for _, n := range nodeList {
  491. labels := n.GetObjectMeta().GetLabels()
  492. key := aws.GetKey(labels, n)
  493. inputkeys[key.Features()] = true
  494. }
  495. pvList := aws.Clientset.GetAllPersistentVolumes()
  496. storageClasses := aws.Clientset.GetAllStorageClasses()
  497. storageClassMap := make(map[string]map[string]string)
  498. for _, storageClass := range storageClasses {
  499. params := storageClass.Parameters
  500. storageClassMap[storageClass.ObjectMeta.Name] = params
  501. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  502. storageClassMap["default"] = params
  503. storageClassMap[""] = params
  504. }
  505. }
  506. pvkeys := make(map[string]PVKey)
  507. for _, pv := range pvList {
  508. params, ok := storageClassMap[pv.Spec.StorageClassName]
  509. if !ok {
  510. klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  511. continue
  512. }
  513. key := aws.GetPVKey(pv, params, "")
  514. pvkeys[key.Features()] = key
  515. }
  516. // RIDataRunning establishes the existance of the goroutine. Since it's possible we
  517. // run multiple downloads, we don't want to create multiple go routines if one already exists
  518. if !aws.RIDataRunning && c.AthenaBucketName != "" {
  519. err = aws.GetReservationDataFromAthena() // Block until one run has completed.
  520. if err != nil {
  521. klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
  522. } else { // If we make one successful run, check on new reservation data every hour
  523. go func() {
  524. defer errors.HandlePanic()
  525. aws.RIDataRunning = true
  526. for {
  527. klog.Infof("Reserved Instance watcher running... next update in 1h")
  528. time.Sleep(time.Hour)
  529. err := aws.GetReservationDataFromAthena()
  530. if err != nil {
  531. klog.Infof("Error updating RI data: %s", err.Error())
  532. }
  533. }
  534. }()
  535. }
  536. }
  537. if !aws.SavingsPlanDataRunning && c.AthenaBucketName != "" {
  538. err = aws.GetSavingsPlanDataFromAthena()
  539. if err != nil {
  540. klog.V(1).Infof("Failed to lookup savings plan data: %s", err.Error())
  541. } else {
  542. go func() {
  543. defer errors.HandlePanic()
  544. aws.SavingsPlanDataRunning = true
  545. for {
  546. klog.Infof("Savings Plan watcher running... next update in 1h")
  547. time.Sleep(time.Hour)
  548. err := aws.GetSavingsPlanDataFromAthena()
  549. if err != nil {
  550. klog.Infof("Error updating Savings Plan data: %s", err.Error())
  551. }
  552. }
  553. }()
  554. }
  555. }
  556. aws.Pricing = make(map[string]*AWSProductTerms)
  557. aws.ValidPricingKeys = make(map[string]bool)
  558. skusToKeys := make(map[string]string)
  559. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  560. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  561. resp, err := http.Get(pricingURL)
  562. if err != nil {
  563. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  564. return err
  565. }
  566. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  567. dec := json.NewDecoder(resp.Body)
  568. for {
  569. t, err := dec.Token()
  570. if err == io.EOF {
  571. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  572. break
  573. }
  574. if t == "products" {
  575. _, err := dec.Token() // this should parse the opening "{""
  576. if err != nil {
  577. return err
  578. }
  579. for dec.More() {
  580. _, err := dec.Token() // the sku token
  581. if err != nil {
  582. return err
  583. }
  584. product := &AWSProduct{}
  585. err = dec.Decode(&product)
  586. if err != nil {
  587. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  588. break
  589. }
  590. if product.Attributes.PreInstalledSw == "NA" &&
  591. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  592. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  593. spotKey := key + ",preemptible"
  594. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  595. productTerms := &AWSProductTerms{
  596. Sku: product.Sku,
  597. Memory: product.Attributes.Memory,
  598. Storage: product.Attributes.Storage,
  599. VCpu: product.Attributes.VCpu,
  600. GPU: product.Attributes.GPU,
  601. }
  602. aws.Pricing[key] = productTerms
  603. aws.Pricing[spotKey] = productTerms
  604. skusToKeys[product.Sku] = key
  605. }
  606. aws.ValidPricingKeys[key] = true
  607. aws.ValidPricingKeys[spotKey] = true
  608. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  609. // UsageTypes may be prefixed with a region code - we're removing this when using
  610. // volTypes to keep lookups generic
  611. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  612. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  613. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  614. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  615. spotKey := key + ",preemptible"
  616. pv := &PV{
  617. Class: volTypes[usageTypeNoRegion],
  618. Region: locationToRegion[product.Attributes.Location],
  619. }
  620. productTerms := &AWSProductTerms{
  621. Sku: product.Sku,
  622. PV: pv,
  623. }
  624. aws.Pricing[key] = productTerms
  625. aws.Pricing[spotKey] = productTerms
  626. skusToKeys[product.Sku] = key
  627. aws.ValidPricingKeys[key] = true
  628. aws.ValidPricingKeys[spotKey] = true
  629. }
  630. }
  631. }
  632. if t == "terms" {
  633. _, err := dec.Token() // this should parse the opening "{""
  634. if err != nil {
  635. return err
  636. }
  637. termType, err := dec.Token()
  638. if err != nil {
  639. return err
  640. }
  641. if termType == "OnDemand" {
  642. _, err := dec.Token()
  643. if err != nil { // again, should parse an opening "{"
  644. return err
  645. }
  646. for dec.More() {
  647. sku, err := dec.Token()
  648. if err != nil {
  649. return err
  650. }
  651. _, err = dec.Token() // another opening "{"
  652. if err != nil {
  653. return err
  654. }
  655. skuOnDemand, err := dec.Token()
  656. if err != nil {
  657. return err
  658. }
  659. offerTerm := &AWSOfferTerm{}
  660. err = dec.Decode(&offerTerm)
  661. if err != nil {
  662. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  663. }
  664. if sku.(string)+OnDemandRateCode == skuOnDemand {
  665. key, ok := skusToKeys[sku.(string)]
  666. spotKey := key + ",preemptible"
  667. if ok {
  668. aws.Pricing[key].OnDemand = offerTerm
  669. aws.Pricing[spotKey].OnDemand = offerTerm
  670. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  671. // If the specific UsageType is the per IO cost used on io1 volumes
  672. // we need to add the per IO cost to the io1 PV cost
  673. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  674. // Add the per IO cost to the PV object for the io1 volume type
  675. aws.Pricing[key].PV.CostPerIO = cost
  676. } else if strings.Contains(key, "EBS:Volume") {
  677. // If volume, we need to get hourly cost and add it to the PV object
  678. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  679. costFloat, _ := strconv.ParseFloat(cost, 64)
  680. hourlyPrice := costFloat / 730
  681. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  682. }
  683. }
  684. }
  685. _, err = dec.Token()
  686. if err != nil {
  687. return err
  688. }
  689. }
  690. _, err = dec.Token()
  691. if err != nil {
  692. return err
  693. }
  694. }
  695. }
  696. }
  697. // Always run spot pricing refresh when performing download
  698. aws.refreshSpotPricing(true)
  699. // Only start a single refresh goroutine
  700. if !aws.SpotRefreshRunning {
  701. aws.SpotRefreshRunning = true
  702. go func() {
  703. defer errors.HandlePanic()
  704. for {
  705. klog.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
  706. time.Sleep(SpotRefreshDuration)
  707. // Reoccurring refresh checks update times
  708. aws.refreshSpotPricing(false)
  709. }
  710. }()
  711. }
  712. return nil
  713. }
  714. func (aws *AWS) refreshSpotPricing(force bool) {
  715. aws.SpotPricingLock.Lock()
  716. defer aws.SpotPricingLock.Unlock()
  717. now := time.Now().UTC()
  718. updateTime := now.Add(-SpotRefreshDuration)
  719. // Return if there was an update time set and an hour hasn't elapsed
  720. if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
  721. return
  722. }
  723. sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  724. if err != nil {
  725. klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
  726. return
  727. }
  728. // update time last updated
  729. aws.SpotPricingUpdatedAt = &now
  730. aws.SpotPricingByInstanceID = sp
  731. }
  732. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  733. func (aws *AWS) NetworkPricing() (*Network, error) {
  734. cpricing, err := aws.Config.GetCustomPricingData()
  735. if err != nil {
  736. return nil, err
  737. }
  738. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  739. if err != nil {
  740. return nil, err
  741. }
  742. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  743. if err != nil {
  744. return nil, err
  745. }
  746. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  747. if err != nil {
  748. return nil, err
  749. }
  750. return &Network{
  751. ZoneNetworkEgressCost: znec,
  752. RegionNetworkEgressCost: rnec,
  753. InternetNetworkEgressCost: inec,
  754. }, nil
  755. }
  756. // AllNodePricing returns all the billing data fetched.
  757. func (aws *AWS) AllNodePricing() (interface{}, error) {
  758. aws.DownloadPricingDataLock.RLock()
  759. defer aws.DownloadPricingDataLock.RUnlock()
  760. return aws.Pricing, nil
  761. }
  762. func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
  763. aws.SpotPricingLock.RLock()
  764. defer aws.SpotPricingLock.RUnlock()
  765. info, ok := aws.SpotPricingByInstanceID[instanceID]
  766. return info, ok
  767. }
  768. func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
  769. aws.RIDataLock.RLock()
  770. defer aws.RIDataLock.RUnlock()
  771. data, ok := aws.RIPricingByInstanceID[instanceID]
  772. return data, ok
  773. }
  774. func (aws *AWS) savingsPlanPricing(instanceID string) (*SavingsPlanData, bool) {
  775. aws.SavingsPlanDataLock.RLock()
  776. defer aws.SavingsPlanDataLock.RUnlock()
  777. data, ok := aws.SavingsPlanDataByInstanceID[instanceID]
  778. return data, ok
  779. }
  780. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  781. key := k.Features()
  782. if spotInfo, ok := aws.spotPricing(k.ID()); ok {
  783. var spotcost string
  784. klog.V(3).Infof("Looking up spot data from feed for node %s", k.ID())
  785. arr := strings.Split(spotInfo.Charge, " ")
  786. if len(arr) == 2 {
  787. spotcost = arr[0]
  788. } else {
  789. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  790. }
  791. return &Node{
  792. Cost: spotcost,
  793. VCPU: terms.VCpu,
  794. RAM: terms.Memory,
  795. GPU: terms.GPU,
  796. Storage: terms.Storage,
  797. BaseCPUPrice: aws.BaseCPUPrice,
  798. BaseRAMPrice: aws.BaseRAMPrice,
  799. BaseGPUPrice: aws.BaseGPUPrice,
  800. UsageType: usageType,
  801. }, nil
  802. } else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
  803. klog.Infof("Node %s marked preemitible but we have no data in spot feed", k.ID())
  804. return &Node{
  805. VCPU: terms.VCpu,
  806. VCPUCost: aws.BaseSpotCPUPrice,
  807. RAM: terms.Memory,
  808. GPU: terms.GPU,
  809. RAMCost: aws.BaseSpotRAMPrice,
  810. Storage: terms.Storage,
  811. BaseCPUPrice: aws.BaseCPUPrice,
  812. BaseRAMPrice: aws.BaseRAMPrice,
  813. BaseGPUPrice: aws.BaseGPUPrice,
  814. UsageType: usageType,
  815. }, nil
  816. } else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
  817. strCost := fmt.Sprintf("%f", sp.EffectiveCost)
  818. return &Node{
  819. Cost: strCost,
  820. VCPU: terms.VCpu,
  821. RAM: terms.Memory,
  822. GPU: terms.GPU,
  823. Storage: terms.Storage,
  824. BaseCPUPrice: aws.BaseCPUPrice,
  825. BaseRAMPrice: aws.BaseRAMPrice,
  826. BaseGPUPrice: aws.BaseGPUPrice,
  827. UsageType: usageType,
  828. }, nil
  829. } else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
  830. strCost := fmt.Sprintf("%f", ri.EffectiveCost)
  831. return &Node{
  832. Cost: strCost,
  833. VCPU: terms.VCpu,
  834. RAM: terms.Memory,
  835. GPU: terms.GPU,
  836. Storage: terms.Storage,
  837. BaseCPUPrice: aws.BaseCPUPrice,
  838. BaseRAMPrice: aws.BaseRAMPrice,
  839. BaseGPUPrice: aws.BaseGPUPrice,
  840. UsageType: usageType,
  841. }, nil
  842. }
  843. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  844. if !ok {
  845. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  846. }
  847. cost := c.PricePerUnit.USD
  848. return &Node{
  849. Cost: cost,
  850. VCPU: terms.VCpu,
  851. RAM: terms.Memory,
  852. GPU: terms.GPU,
  853. Storage: terms.Storage,
  854. BaseCPUPrice: aws.BaseCPUPrice,
  855. BaseRAMPrice: aws.BaseRAMPrice,
  856. BaseGPUPrice: aws.BaseGPUPrice,
  857. UsageType: usageType,
  858. }, nil
  859. }
  860. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  861. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  862. aws.DownloadPricingDataLock.RLock()
  863. defer aws.DownloadPricingDataLock.RUnlock()
  864. key := k.Features()
  865. usageType := "ondemand"
  866. if aws.isPreemptible(key) {
  867. usageType = "preemptible"
  868. }
  869. terms, ok := aws.Pricing[key]
  870. if ok {
  871. return aws.createNode(terms, usageType, k)
  872. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  873. aws.DownloadPricingDataLock.RUnlock()
  874. err := aws.DownloadPricingData()
  875. aws.DownloadPricingDataLock.RLock()
  876. if err != nil {
  877. return &Node{
  878. Cost: aws.BaseCPUPrice,
  879. BaseCPUPrice: aws.BaseCPUPrice,
  880. BaseRAMPrice: aws.BaseRAMPrice,
  881. BaseGPUPrice: aws.BaseGPUPrice,
  882. UsageType: usageType,
  883. UsesBaseCPUPrice: true,
  884. }, err
  885. }
  886. terms, termsOk := aws.Pricing[key]
  887. if !termsOk {
  888. return &Node{
  889. Cost: aws.BaseCPUPrice,
  890. BaseCPUPrice: aws.BaseCPUPrice,
  891. BaseRAMPrice: aws.BaseRAMPrice,
  892. BaseGPUPrice: aws.BaseGPUPrice,
  893. UsageType: usageType,
  894. UsesBaseCPUPrice: true,
  895. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  896. }
  897. return aws.createNode(terms, usageType, k)
  898. } else { // Fall back to base pricing if we can't find the key. Base pricing is handled at the costmodel level.
  899. return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
  900. }
  901. }
  902. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  903. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  904. defaultClusterName := "AWS Cluster #1"
  905. c, err := awsProvider.GetConfig()
  906. if err != nil {
  907. return nil, err
  908. }
  909. remoteEnabled := env.IsRemoteEnabled()
  910. if c.ClusterName != "" {
  911. m := make(map[string]string)
  912. m["name"] = c.ClusterName
  913. m["provider"] = "AWS"
  914. m["id"] = env.GetClusterID()
  915. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  916. return m, nil
  917. }
  918. makeStructure := func(clusterName string) (map[string]string, error) {
  919. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  920. m := make(map[string]string)
  921. m["name"] = clusterName
  922. m["provider"] = "AWS"
  923. m["id"] = env.GetClusterID()
  924. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  925. return m, nil
  926. }
  927. maybeClusterId := env.GetAWSClusterID()
  928. if len(maybeClusterId) != 0 {
  929. return makeStructure(maybeClusterId)
  930. }
  931. // TODO: This should be cached, it can take a long time to hit the API
  932. //provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  933. //clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  934. //klog.Infof("nodelist get here %s", time.Now())
  935. //nodeList := awsProvider.Clientset.GetAllNodes()
  936. //klog.Infof("nodelist done here %s", time.Now())
  937. /*for _, n := range nodeList {
  938. region := ""
  939. instanceId := ""
  940. providerId := n.Spec.ProviderID
  941. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  942. if matchNum == 1 {
  943. region = group
  944. } else if matchNum == 2 {
  945. instanceId = group
  946. }
  947. }
  948. if len(instanceId) == 0 {
  949. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  950. continue
  951. }
  952. c := &aws.Config{
  953. Region: aws.String(region),
  954. }
  955. s := session.Must(session.NewSession(c))
  956. ec2Svc := ec2.New(s)
  957. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  958. InstanceIds: []*string{
  959. aws.String(instanceId),
  960. },
  961. })
  962. if diErr != nil {
  963. klog.Infof("Error describing instances: %s", diErr)
  964. continue
  965. }
  966. if len(di.Reservations) != 1 {
  967. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  968. continue
  969. }
  970. res := di.Reservations[0]
  971. if len(res.Instances) != 1 {
  972. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  973. continue
  974. }
  975. inst := res.Instances[0]
  976. for _, tag := range inst.Tags {
  977. tagKey := *tag.Key
  978. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  979. if matchNum != 1 {
  980. continue
  981. }
  982. return makeStructure(group)
  983. }
  984. }
  985. }*/
  986. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", env.AWSClusterIDEnvVar)
  987. return makeStructure(defaultClusterName)
  988. }
  989. // Gets the aws key id and secret
  990. func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
  991. if aws.ServiceAccountChecks == nil { // safety in case checks don't exist
  992. aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  993. }
  994. // 1. Check config values first (set from frontend UI)
  995. if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
  996. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  997. Message: "AWS ServiceKey exists",
  998. Status: true,
  999. }
  1000. return cp.ServiceKeyName, cp.ServiceKeySecret
  1001. }
  1002. // 2. Check for secret
  1003. s, _ := aws.loadAWSAuthSecret(forceReload)
  1004. if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
  1005. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1006. Message: "AWS ServiceKey exists",
  1007. Status: true,
  1008. }
  1009. return s.AccessKeyID, s.SecretAccessKey
  1010. }
  1011. // 3. Fall back to env vars
  1012. if env.GetAWSAccessKeyID() == "" || env.GetAWSAccessKeyID() == "" {
  1013. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1014. Message: "AWS ServiceKey exists",
  1015. Status: false,
  1016. }
  1017. } else {
  1018. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1019. Message: "AWS ServiceKey exists",
  1020. Status: true,
  1021. }
  1022. }
  1023. return env.GetAWSAccessKeyID(), env.GetAWSAccessKeySecret()
  1024. }
  1025. // Load once and cache the result (even on failure). This is an install time secret, so
  1026. // we don't expect the secret to change. If it does, however, we can force reload using
  1027. // the input parameter.
  1028. func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
  1029. if !force && loadedAWSSecret {
  1030. return awsSecret, nil
  1031. }
  1032. loadedAWSSecret = true
  1033. exists, err := util.FileExists(authSecretPath)
  1034. if !exists || err != nil {
  1035. return nil, fmt.Errorf("Failed to locate service account file: %s", authSecretPath)
  1036. }
  1037. result, err := ioutil.ReadFile(authSecretPath)
  1038. if err != nil {
  1039. return nil, err
  1040. }
  1041. var ak AWSAccessKey
  1042. err = json.Unmarshal(result, &ak)
  1043. if err != nil {
  1044. return nil, err
  1045. }
  1046. awsSecret = &ak
  1047. return awsSecret, nil
  1048. }
  1049. func (aws *AWS) configureAWSAuth() error {
  1050. accessKeyID := aws.ServiceKeyName
  1051. accessKeySecret := aws.ServiceKeySecret
  1052. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1053. err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
  1054. if err != nil {
  1055. return err
  1056. }
  1057. err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
  1058. if err != nil {
  1059. return err
  1060. }
  1061. }
  1062. return nil
  1063. }
  1064. func getClusterConfig(ccFile string) (map[string]string, error) {
  1065. clusterConfig, err := os.Open(ccFile)
  1066. if err != nil {
  1067. return nil, err
  1068. }
  1069. defer clusterConfig.Close()
  1070. b, err := ioutil.ReadAll(clusterConfig)
  1071. if err != nil {
  1072. return nil, err
  1073. }
  1074. var clusterConf map[string]string
  1075. err = json.Unmarshal([]byte(b), &clusterConf)
  1076. if err != nil {
  1077. return nil, err
  1078. }
  1079. return clusterConf, nil
  1080. }
  1081. // SetKeyEnv ensures that the two environment variables necessary to configure
  1082. // a new AWS Session are set.
  1083. func (a *AWS) SetKeyEnv() error {
  1084. // TODO add this to the helm chart, mirroring the cost-model
  1085. // configPath := env.GetConfigPath()
  1086. configPath := defaultConfigPath
  1087. path := configPath + "aws.json"
  1088. if _, err := os.Stat(path); err != nil {
  1089. if os.IsNotExist(err) {
  1090. log.DedupedErrorf(5, "file %s does not exist", path)
  1091. } else {
  1092. log.DedupedErrorf(5, "other file open error: %s", err)
  1093. }
  1094. return err
  1095. }
  1096. jsonFile, err := os.Open(path)
  1097. defer jsonFile.Close()
  1098. configMap := map[string]string{}
  1099. configBytes, err := ioutil.ReadAll(jsonFile)
  1100. if err != nil {
  1101. return err
  1102. }
  1103. json.Unmarshal([]byte(configBytes), &configMap)
  1104. keyName := configMap["awsServiceKeyName"]
  1105. keySecret := configMap["awsServiceKeySecret"]
  1106. // These are required before calling NewEnvCredentials below
  1107. env.Set(env.AWSAccessKeyIDEnvVar, keyName)
  1108. env.Set(env.AWSAccessKeySecretEnvVar, keySecret)
  1109. return nil
  1110. }
  1111. func (a *AWS) getAddressesForRegion(region string) (*ec2.DescribeAddressesOutput, error) {
  1112. sess, err := session.NewSession(&aws.Config{
  1113. Region: aws.String(region),
  1114. Credentials: credentials.NewEnvCredentials(),
  1115. })
  1116. if err != nil {
  1117. return nil, err
  1118. }
  1119. ec2Svc := ec2.New(sess)
  1120. return ec2Svc.DescribeAddresses(&ec2.DescribeAddressesInput{})
  1121. }
  1122. func (a *AWS) GetAddresses() ([]byte, error) {
  1123. if err := a.SetKeyEnv(); err != nil {
  1124. return nil, err
  1125. }
  1126. addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
  1127. errorCh := make(chan error, len(awsRegions))
  1128. var wg sync.WaitGroup
  1129. wg.Add(len(awsRegions))
  1130. // Get volumes from each AWS region
  1131. for _, r := range awsRegions {
  1132. // Fetch IP address response and send results and errors to their
  1133. // respective channels
  1134. go func(region string) {
  1135. defer wg.Done()
  1136. defer errors.HandlePanic()
  1137. // Query for first page of volume results
  1138. resp, err := a.getAddressesForRegion(region)
  1139. if err != nil {
  1140. if aerr, ok := err.(awserr.Error); ok {
  1141. switch aerr.Code() {
  1142. default:
  1143. errorCh <- aerr
  1144. }
  1145. return
  1146. } else {
  1147. errorCh <- err
  1148. return
  1149. }
  1150. }
  1151. addressCh <- resp
  1152. }(r)
  1153. }
  1154. // Close the result channels after everything has been sent
  1155. go func() {
  1156. defer errors.HandlePanic()
  1157. wg.Wait()
  1158. close(errorCh)
  1159. close(addressCh)
  1160. }()
  1161. addresses := []*ec2.Address{}
  1162. for adds := range addressCh {
  1163. addresses = append(addresses, adds.Addresses...)
  1164. }
  1165. errors := []error{}
  1166. for err := range errorCh {
  1167. log.DedupedWarningf(5, "unable to get addresses: %s", err)
  1168. errors = append(errors, err)
  1169. }
  1170. // Return error if no addresses are returned
  1171. if len(errors) > 0 && len(addresses) == 0 {
  1172. return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errors), errors)
  1173. }
  1174. // Format the response this way to match the JSON-encoded formatting of a single response
  1175. // from DescribeAddresss, so that consumers can always expect AWS disk responses to have
  1176. // a "Addresss" key at the top level.
  1177. return json.Marshal(map[string][]*ec2.Address{
  1178. "Addresses": addresses,
  1179. })
  1180. }
  1181. func (a *AWS) getDisksForRegion(region string, maxResults int64, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
  1182. sess, err := session.NewSession(&aws.Config{
  1183. Region: aws.String(region),
  1184. Credentials: credentials.NewEnvCredentials(),
  1185. })
  1186. if err != nil {
  1187. return nil, err
  1188. }
  1189. ec2Svc := ec2.New(sess)
  1190. return ec2Svc.DescribeVolumes(&ec2.DescribeVolumesInput{
  1191. MaxResults: &maxResults,
  1192. NextToken: nextToken,
  1193. })
  1194. }
  1195. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  1196. func (a *AWS) GetDisks() ([]byte, error) {
  1197. if err := a.SetKeyEnv(); err != nil {
  1198. return nil, err
  1199. }
  1200. volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
  1201. errorCh := make(chan error, len(awsRegions))
  1202. var wg sync.WaitGroup
  1203. wg.Add(len(awsRegions))
  1204. // Get volumes from each AWS region
  1205. for _, r := range awsRegions {
  1206. // Fetch volume response and send results and errors to their
  1207. // respective channels
  1208. go func(region string) {
  1209. defer wg.Done()
  1210. defer errors.HandlePanic()
  1211. // Query for first page of volume results
  1212. resp, err := a.getDisksForRegion(region, 1000, nil)
  1213. if err != nil {
  1214. if aerr, ok := err.(awserr.Error); ok {
  1215. switch aerr.Code() {
  1216. default:
  1217. errorCh <- aerr
  1218. }
  1219. return
  1220. } else {
  1221. errorCh <- err
  1222. return
  1223. }
  1224. }
  1225. volumeCh <- resp
  1226. // A NextToken indicates more pages of results. Keep querying
  1227. // until all pages are retrieved.
  1228. for resp.NextToken != nil {
  1229. resp, err = a.getDisksForRegion(region, 100, resp.NextToken)
  1230. if err != nil {
  1231. if aerr, ok := err.(awserr.Error); ok {
  1232. switch aerr.Code() {
  1233. default:
  1234. errorCh <- aerr
  1235. }
  1236. return
  1237. } else {
  1238. errorCh <- err
  1239. return
  1240. }
  1241. }
  1242. volumeCh <- resp
  1243. }
  1244. }(r)
  1245. }
  1246. // Close the result channels after everything has been sent
  1247. go func() {
  1248. defer errors.HandlePanic()
  1249. wg.Wait()
  1250. close(errorCh)
  1251. close(volumeCh)
  1252. }()
  1253. volumes := []*ec2.Volume{}
  1254. for vols := range volumeCh {
  1255. volumes = append(volumes, vols.Volumes...)
  1256. }
  1257. errors := []error{}
  1258. for err := range errorCh {
  1259. log.DedupedWarningf(5, "unable to get disks: %s", err)
  1260. errors = append(errors, err)
  1261. }
  1262. // Return error if no volumes are returned
  1263. if len(errors) > 0 && len(volumes) == 0 {
  1264. return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errors), errors)
  1265. }
  1266. // Format the response this way to match the JSON-encoded formatting of a single response
  1267. // from DescribeVolumes, so that consumers can always expect AWS disk responses to have
  1268. // a "Volumes" key at the top level.
  1269. return json.Marshal(map[string][]*ec2.Volume{
  1270. "Volumes": volumes,
  1271. })
  1272. }
  1273. // ConvertToGlueColumnFormat takes a string and runs through various regex
  1274. // and string replacement statements to convert it to a format compatible
  1275. // with AWS Glue and Athena column names.
  1276. // Following guidance from AWS provided here ('Column Names' section):
  1277. // https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/run-athena-sql.html
  1278. // It returns a string containing the column name in proper column name format and length.
  1279. func ConvertToGlueColumnFormat(column_name string) string {
  1280. klog.V(5).Infof("Converting string \"%s\" to proper AWS Glue column name.", column_name)
  1281. // An underscore is added in front of uppercase letters
  1282. capital_underscore := regexp.MustCompile(`[A-Z]`)
  1283. final := capital_underscore.ReplaceAllString(column_name, `_$0`)
  1284. // Any non-alphanumeric characters are replaced with an underscore
  1285. no_space_punc := regexp.MustCompile(`[\s]{1,}|[^A-Za-z0-9]`)
  1286. final = no_space_punc.ReplaceAllString(final, "_")
  1287. // Duplicate underscores are removed
  1288. no_dup_underscore := regexp.MustCompile(`_{2,}`)
  1289. final = no_dup_underscore.ReplaceAllString(final, "_")
  1290. // Any leading and trailing underscores are removed
  1291. no_front_end_underscore := regexp.MustCompile(`(^\_|\_$)`)
  1292. final = no_front_end_underscore.ReplaceAllString(final, "")
  1293. // Uppercase to lowercase
  1294. final = strings.ToLower(final)
  1295. // Longer column name than expected - remove _ left to right
  1296. allowed_col_len := 128
  1297. undersc_to_remove := len(final) - allowed_col_len
  1298. if undersc_to_remove > 0 {
  1299. final = strings.Replace(final, "_", "", undersc_to_remove)
  1300. }
  1301. // If removing all of the underscores still didn't
  1302. // make the column name < 128 characters, trim it!
  1303. if len(final) > allowed_col_len {
  1304. final = final[:allowed_col_len]
  1305. }
  1306. klog.V(5).Infof("Column name being returned: \"%s\". Length: \"%d\".", final, len(final))
  1307. return final
  1308. }
  1309. func generateAWSGroupBy(lastIdx int) string {
  1310. sequence := []string{}
  1311. for i := 1; i < lastIdx+1; i++ {
  1312. sequence = append(sequence, strconv.Itoa(i))
  1313. }
  1314. return strings.Join(sequence, ",")
  1315. }
  1316. func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
  1317. customPricing, err := a.GetConfig()
  1318. if err != nil {
  1319. return nil, err
  1320. }
  1321. if customPricing.ServiceKeyName != "" {
  1322. err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1323. if err != nil {
  1324. return nil, err
  1325. }
  1326. err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1327. if err != nil {
  1328. return nil, err
  1329. }
  1330. }
  1331. region := aws.String(customPricing.AthenaRegion)
  1332. resultsBucket := customPricing.AthenaBucketName
  1333. database := customPricing.AthenaDatabase
  1334. c := &aws.Config{
  1335. Region: region,
  1336. }
  1337. s := session.Must(session.NewSession(c))
  1338. svc := athena.New(s)
  1339. if customPricing.MasterPayerARN != "" {
  1340. creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
  1341. svc = athena.New(s, &aws.Config{
  1342. Region: region,
  1343. Credentials: creds,
  1344. })
  1345. }
  1346. var e athena.StartQueryExecutionInput
  1347. var r athena.ResultConfiguration
  1348. r.SetOutputLocation(resultsBucket)
  1349. e.SetResultConfiguration(&r)
  1350. e.SetQueryString(query)
  1351. var q athena.QueryExecutionContext
  1352. q.SetDatabase(database)
  1353. e.SetQueryExecutionContext(&q)
  1354. res, err := svc.StartQueryExecution(&e)
  1355. if err != nil {
  1356. return nil, err
  1357. }
  1358. klog.V(2).Infof("StartQueryExecution result:")
  1359. klog.V(2).Infof(res.GoString())
  1360. var qri athena.GetQueryExecutionInput
  1361. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1362. var qrop *athena.GetQueryExecutionOutput
  1363. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1364. for {
  1365. qrop, err = svc.GetQueryExecution(&qri)
  1366. if err != nil {
  1367. return nil, err
  1368. }
  1369. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1370. break
  1371. }
  1372. time.Sleep(duration)
  1373. }
  1374. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1375. var ip athena.GetQueryResultsInput
  1376. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1377. return svc.GetQueryResults(&ip)
  1378. } else {
  1379. return nil, fmt.Errorf("No results available for %s", query)
  1380. }
  1381. }
  1382. type SavingsPlanData struct {
  1383. ResourceID string
  1384. EffectiveCost float64
  1385. SavingsPlanARN string
  1386. MostRecentDate string
  1387. }
  1388. func (a *AWS) GetSavingsPlanDataFromAthena() error {
  1389. cfg, err := a.GetConfig()
  1390. if err != nil {
  1391. return err
  1392. }
  1393. if cfg.AthenaBucketName == "" {
  1394. return fmt.Errorf("No Athena Bucket configured")
  1395. }
  1396. if a.SavingsPlanDataByInstanceID == nil {
  1397. a.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData)
  1398. }
  1399. tNow := time.Now()
  1400. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1401. start := tOneDayAgo.Format("2006-01-02")
  1402. end := tNow.Format("2006-01-02")
  1403. q := `SELECT
  1404. line_item_usage_start_date,
  1405. savings_plan_savings_plan_a_r_n,
  1406. line_item_resource_id,
  1407. savings_plan_savings_plan_effective_cost
  1408. FROM %s as cost_data
  1409. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1410. AND line_item_line_item_type = 'SavingsPlanCoveredUsage' ORDER BY
  1411. line_item_usage_start_date DESC`
  1412. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1413. op, err := a.QueryAthenaBillingData(query)
  1414. if err != nil {
  1415. return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
  1416. }
  1417. klog.Infof("Fetching SavingsPlan data...")
  1418. if len(op.ResultSet.Rows) > 1 {
  1419. a.SavingsPlanDataLock.Lock()
  1420. mostRecentDate := ""
  1421. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1422. d := *r.Data[0].VarCharValue
  1423. if mostRecentDate == "" {
  1424. mostRecentDate = d
  1425. } else if mostRecentDate != d { // Get all most recent assignments
  1426. break
  1427. }
  1428. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1429. if err != nil {
  1430. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1431. }
  1432. r := &SavingsPlanData{
  1433. ResourceID: *r.Data[2].VarCharValue,
  1434. EffectiveCost: cost,
  1435. SavingsPlanARN: *r.Data[1].VarCharValue,
  1436. MostRecentDate: d,
  1437. }
  1438. a.SavingsPlanDataByInstanceID[r.ResourceID] = r
  1439. }
  1440. klog.V(1).Infof("Found %d savings plan applied instances", len(a.SavingsPlanDataByInstanceID))
  1441. for k, r := range a.SavingsPlanDataByInstanceID {
  1442. klog.V(1).Infof("Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1443. }
  1444. a.SavingsPlanDataLock.Unlock()
  1445. } else {
  1446. klog.Infof("No savings plan applied instance data found")
  1447. }
  1448. return nil
  1449. }
  1450. type RIData struct {
  1451. ResourceID string
  1452. EffectiveCost float64
  1453. ReservationARN string
  1454. MostRecentDate string
  1455. }
  1456. func (a *AWS) GetReservationDataFromAthena() error {
  1457. cfg, err := a.GetConfig()
  1458. if err != nil {
  1459. return err
  1460. }
  1461. if cfg.AthenaBucketName == "" {
  1462. return fmt.Errorf("No Athena Bucket configured")
  1463. }
  1464. if a.RIPricingByInstanceID == nil {
  1465. a.RIPricingByInstanceID = make(map[string]*RIData)
  1466. }
  1467. tNow := time.Now()
  1468. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1469. start := tOneDayAgo.Format("2006-01-02")
  1470. end := tNow.Format("2006-01-02")
  1471. q := `SELECT
  1472. line_item_usage_start_date,
  1473. reservation_reservation_a_r_n,
  1474. line_item_resource_id,
  1475. reservation_effective_cost
  1476. FROM %s as cost_data
  1477. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1478. AND reservation_reservation_a_r_n <> '' ORDER BY
  1479. line_item_usage_start_date DESC`
  1480. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1481. op, err := a.QueryAthenaBillingData(query)
  1482. if err != nil {
  1483. return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
  1484. }
  1485. klog.Infof("Fetching RI data...")
  1486. if len(op.ResultSet.Rows) > 1 {
  1487. a.RIDataLock.Lock()
  1488. mostRecentDate := ""
  1489. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1490. d := *r.Data[0].VarCharValue
  1491. if mostRecentDate == "" {
  1492. mostRecentDate = d
  1493. } else if mostRecentDate != d { // Get all most recent assignments
  1494. break
  1495. }
  1496. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1497. if err != nil {
  1498. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1499. }
  1500. r := &RIData{
  1501. ResourceID: *r.Data[2].VarCharValue,
  1502. EffectiveCost: cost,
  1503. ReservationARN: *r.Data[1].VarCharValue,
  1504. MostRecentDate: d,
  1505. }
  1506. a.RIPricingByInstanceID[r.ResourceID] = r
  1507. }
  1508. klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
  1509. for k, r := range a.RIPricingByInstanceID {
  1510. klog.V(1).Infof("Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1511. }
  1512. a.RIDataLock.Unlock()
  1513. } else {
  1514. klog.Infof("No reserved instance data found")
  1515. }
  1516. return nil
  1517. }
  1518. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  1519. // "start" and "end" are dates of the format YYYY-MM-DD
  1520. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  1521. func (a *AWS) ExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string, crossCluster bool) ([]*OutOfClusterAllocation, error) {
  1522. customPricing, err := a.GetConfig()
  1523. if err != nil {
  1524. return nil, err
  1525. }
  1526. formattedAggregators := []string{}
  1527. for _, agg := range aggregators {
  1528. aggregator_column_name := "resource_tags_user_" + agg
  1529. aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
  1530. formattedAggregators = append(formattedAggregators, aggregator_column_name)
  1531. }
  1532. aggregatorNames := strings.Join(formattedAggregators, ",")
  1533. aggregatorOr := strings.Join(formattedAggregators, " <> '' OR ")
  1534. aggregatorOr = aggregatorOr + " <> ''"
  1535. filter_column_name := "resource_tags_user_" + filterType
  1536. filter_column_name = ConvertToGlueColumnFormat(filter_column_name)
  1537. var query string
  1538. var lastIdx int
  1539. if filterType != "kubernetes_" { // This gets appended upstream and is equivalent to no filter.
  1540. lastIdx = len(formattedAggregators) + 3
  1541. groupby := generateAWSGroupBy(lastIdx)
  1542. query = fmt.Sprintf(`SELECT
  1543. CAST(line_item_usage_start_date AS DATE) as start_date,
  1544. %s,
  1545. line_item_product_code,
  1546. %s,
  1547. SUM(line_item_blended_cost) as blended_cost
  1548. FROM %s as cost_data
  1549. WHERE (%s='%s') AND line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
  1550. GROUP BY %s`, aggregatorNames, filter_column_name, customPricing.AthenaTable, filter_column_name, filterValue, start, end, aggregatorOr, groupby)
  1551. } else {
  1552. lastIdx = len(formattedAggregators) + 2
  1553. groupby := generateAWSGroupBy(lastIdx)
  1554. query = fmt.Sprintf(`SELECT
  1555. CAST(line_item_usage_start_date AS DATE) as start_date,
  1556. %s,
  1557. line_item_product_code,
  1558. SUM(line_item_blended_cost) as blended_cost
  1559. FROM %s as cost_data
  1560. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
  1561. GROUP BY %s`, aggregatorNames, customPricing.AthenaTable, start, end, aggregatorOr, groupby)
  1562. }
  1563. klog.V(3).Infof("Running Query: %s", query)
  1564. if customPricing.ServiceKeyName != "" {
  1565. err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1566. if err != nil {
  1567. return nil, err
  1568. }
  1569. err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1570. if err != nil {
  1571. return nil, err
  1572. }
  1573. }
  1574. region := aws.String(customPricing.AthenaRegion)
  1575. resultsBucket := customPricing.AthenaBucketName
  1576. database := customPricing.AthenaDatabase
  1577. c := &aws.Config{
  1578. Region: region,
  1579. }
  1580. s := session.Must(session.NewSession(c))
  1581. svc := athena.New(s)
  1582. var e athena.StartQueryExecutionInput
  1583. var r athena.ResultConfiguration
  1584. r.SetOutputLocation(resultsBucket)
  1585. e.SetResultConfiguration(&r)
  1586. e.SetQueryString(query)
  1587. var q athena.QueryExecutionContext
  1588. q.SetDatabase(database)
  1589. e.SetQueryExecutionContext(&q)
  1590. res, err := svc.StartQueryExecution(&e)
  1591. if err != nil {
  1592. return nil, err
  1593. }
  1594. klog.V(2).Infof("StartQueryExecution result:")
  1595. klog.V(2).Infof(res.GoString())
  1596. var qri athena.GetQueryExecutionInput
  1597. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1598. var qrop *athena.GetQueryExecutionOutput
  1599. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1600. for {
  1601. qrop, err = svc.GetQueryExecution(&qri)
  1602. if err != nil {
  1603. return nil, err
  1604. }
  1605. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1606. break
  1607. }
  1608. time.Sleep(duration)
  1609. }
  1610. var oocAllocs []*OutOfClusterAllocation
  1611. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1612. var ip athena.GetQueryResultsInput
  1613. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1614. op, err := svc.GetQueryResults(&ip)
  1615. if err != nil {
  1616. return nil, err
  1617. }
  1618. if len(op.ResultSet.Rows) > 1 {
  1619. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows))] {
  1620. cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
  1621. if err != nil {
  1622. return nil, err
  1623. }
  1624. environment := ""
  1625. for _, d := range r.Data[1 : len(formattedAggregators)+1] {
  1626. if *d.VarCharValue != "" {
  1627. environment = *d.VarCharValue // just set to the first nonempty match
  1628. }
  1629. break
  1630. }
  1631. ooc := &OutOfClusterAllocation{
  1632. Aggregator: strings.Join(aggregators, ","),
  1633. Environment: environment,
  1634. Service: *r.Data[len(formattedAggregators)+1].VarCharValue,
  1635. Cost: cost,
  1636. }
  1637. oocAllocs = append(oocAllocs, ooc)
  1638. }
  1639. } else {
  1640. klog.V(1).Infof("No results available for %s at database %s between %s and %s", strings.Join(formattedAggregators, ","), customPricing.AthenaTable, start, end)
  1641. }
  1642. }
  1643. if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
  1644. gcp, err := NewCrossClusterProvider("gcp", "aws.json", a.Clientset)
  1645. if err != nil {
  1646. klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
  1647. }
  1648. gcpOOC, err := gcp.ExternalAllocations(start, end, aggregators, filterType, filterValue, true)
  1649. if err != nil {
  1650. klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
  1651. }
  1652. oocAllocs = append(oocAllocs, gcpOOC...)
  1653. }
  1654. return oocAllocs, nil
  1655. }
  1656. // QuerySQL can query a properly configured Athena database.
  1657. // Used to fetch billing data.
  1658. // Requires a json config in /var/configs with key region, output, and database.
  1659. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  1660. customPricing, err := a.GetConfig()
  1661. if err != nil {
  1662. return nil, err
  1663. }
  1664. if customPricing.ServiceKeyName != "" {
  1665. err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1666. if err != nil {
  1667. return nil, err
  1668. }
  1669. err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1670. if err != nil {
  1671. return nil, err
  1672. }
  1673. }
  1674. athenaConfigs, err := os.Open("/var/configs/athena.json")
  1675. if err != nil {
  1676. return nil, err
  1677. }
  1678. defer athenaConfigs.Close()
  1679. b, err := ioutil.ReadAll(athenaConfigs)
  1680. if err != nil {
  1681. return nil, err
  1682. }
  1683. var athenaConf map[string]string
  1684. json.Unmarshal([]byte(b), &athenaConf)
  1685. region := aws.String(customPricing.AthenaRegion)
  1686. resultsBucket := customPricing.AthenaBucketName
  1687. database := customPricing.AthenaDatabase
  1688. c := &aws.Config{
  1689. Region: region,
  1690. }
  1691. s := session.Must(session.NewSession(c))
  1692. svc := athena.New(s)
  1693. var e athena.StartQueryExecutionInput
  1694. var r athena.ResultConfiguration
  1695. r.SetOutputLocation(resultsBucket)
  1696. e.SetResultConfiguration(&r)
  1697. e.SetQueryString(query)
  1698. var q athena.QueryExecutionContext
  1699. q.SetDatabase(database)
  1700. e.SetQueryExecutionContext(&q)
  1701. res, err := svc.StartQueryExecution(&e)
  1702. if err != nil {
  1703. return nil, err
  1704. }
  1705. klog.V(2).Infof("StartQueryExecution result:")
  1706. klog.V(2).Infof(res.GoString())
  1707. var qri athena.GetQueryExecutionInput
  1708. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1709. var qrop *athena.GetQueryExecutionOutput
  1710. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1711. for {
  1712. qrop, err = svc.GetQueryExecution(&qri)
  1713. if err != nil {
  1714. return nil, err
  1715. }
  1716. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1717. break
  1718. }
  1719. time.Sleep(duration)
  1720. }
  1721. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1722. var ip athena.GetQueryResultsInput
  1723. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1724. op, err := svc.GetQueryResults(&ip)
  1725. if err != nil {
  1726. return nil, err
  1727. }
  1728. b, err := json.Marshal(op.ResultSet)
  1729. if err != nil {
  1730. return nil, err
  1731. }
  1732. return b, nil
  1733. }
  1734. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  1735. }
  1736. type spotInfo struct {
  1737. Timestamp string `csv:"Timestamp"`
  1738. UsageType string `csv:"UsageType"`
  1739. Operation string `csv:"Operation"`
  1740. InstanceID string `csv:"InstanceID"`
  1741. MyBidID string `csv:"MyBidID"`
  1742. MyMaxPrice string `csv:"MyMaxPrice"`
  1743. MarketPrice string `csv:"MarketPrice"`
  1744. Charge string `csv:"Charge"`
  1745. Version string `csv:"Version"`
  1746. }
  1747. type fnames []*string
  1748. func (f fnames) Len() int {
  1749. return len(f)
  1750. }
  1751. func (f fnames) Swap(i, j int) {
  1752. f[i], f[j] = f[j], f[i]
  1753. }
  1754. func (f fnames) Less(i, j int) bool {
  1755. key1 := strings.Split(*f[i], ".")
  1756. key2 := strings.Split(*f[j], ".")
  1757. t1, err := time.Parse("2006-01-02-15", key1[1])
  1758. if err != nil {
  1759. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  1760. return false
  1761. }
  1762. t2, err := time.Parse("2006-01-02-15", key2[1])
  1763. if err != nil {
  1764. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  1765. return false
  1766. }
  1767. return t1.Before(t2)
  1768. }
  1769. func (a *AWS) parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  1770. if a.ServiceAccountChecks == nil { // Set up checks to store error/success states
  1771. a.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  1772. }
  1773. // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1774. if accessKeyID != "" && accessKeySecret != "" {
  1775. err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
  1776. if err != nil {
  1777. return nil, err
  1778. }
  1779. err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
  1780. if err != nil {
  1781. return nil, err
  1782. }
  1783. }
  1784. s3Prefix := projectID
  1785. if len(prefix) != 0 {
  1786. s3Prefix = prefix + "/" + s3Prefix
  1787. }
  1788. c := aws.NewConfig().WithRegion(region)
  1789. s := session.Must(session.NewSession(c))
  1790. s3Svc := s3.New(s)
  1791. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  1792. tNow := time.Now()
  1793. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1794. ls := &s3.ListObjectsInput{
  1795. Bucket: aws.String(bucket),
  1796. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1797. }
  1798. ls2 := &s3.ListObjectsInput{
  1799. Bucket: aws.String(bucket),
  1800. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1801. }
  1802. lso, err := s3Svc.ListObjects(ls)
  1803. if err != nil {
  1804. a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
  1805. Message: "Bucket List Permissions Available",
  1806. Status: false,
  1807. AdditionalInfo: err.Error(),
  1808. }
  1809. return nil, err
  1810. } else {
  1811. a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
  1812. Message: "Bucket List Permissions Available",
  1813. Status: true,
  1814. }
  1815. }
  1816. lsoLen := len(lso.Contents)
  1817. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1818. if lsoLen == 0 {
  1819. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1820. }
  1821. lso2, err := s3Svc.ListObjects(ls2)
  1822. if err != nil {
  1823. return nil, err
  1824. }
  1825. lso2Len := len(lso2.Contents)
  1826. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1827. if lso2Len == 0 {
  1828. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1829. }
  1830. // TODO: Worth it to use LastModifiedDate to determine if we should reparse the spot data?
  1831. var keys []*string
  1832. for _, obj := range lso.Contents {
  1833. keys = append(keys, obj.Key)
  1834. }
  1835. for _, obj := range lso2.Contents {
  1836. keys = append(keys, obj.Key)
  1837. }
  1838. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1839. header, err := csvutil.Header(spotInfo{}, "csv")
  1840. if err != nil {
  1841. return nil, err
  1842. }
  1843. fieldsPerRecord := len(header)
  1844. spots := make(map[string]*spotInfo)
  1845. for _, key := range keys {
  1846. getObj := &s3.GetObjectInput{
  1847. Bucket: aws.String(bucket),
  1848. Key: key,
  1849. }
  1850. buf := aws.NewWriteAtBuffer([]byte{})
  1851. _, err := downloader.Download(buf, getObj)
  1852. if err != nil {
  1853. a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
  1854. Message: "Object Get Permissions Available",
  1855. Status: false,
  1856. AdditionalInfo: err.Error(),
  1857. }
  1858. return nil, err
  1859. } else {
  1860. a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
  1861. Message: "Object Get Permissions Available",
  1862. Status: true,
  1863. }
  1864. }
  1865. r := bytes.NewReader(buf.Bytes())
  1866. gr, err := gzip.NewReader(r)
  1867. if err != nil {
  1868. return nil, err
  1869. }
  1870. csvReader := csv.NewReader(gr)
  1871. csvReader.Comma = '\t'
  1872. csvReader.FieldsPerRecord = fieldsPerRecord
  1873. dec, err := csvutil.NewDecoder(csvReader, header...)
  1874. if err != nil {
  1875. return nil, err
  1876. }
  1877. var foundVersion string
  1878. for {
  1879. spot := spotInfo{}
  1880. err := dec.Decode(&spot)
  1881. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1882. if err == io.EOF {
  1883. break
  1884. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1885. rec := dec.Record()
  1886. // the first two "Record()" will be the comment lines
  1887. // and they show up as len() == 1
  1888. // the first of which is "#Version"
  1889. // the second of which is "#Fields: "
  1890. if len(rec) != 1 {
  1891. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1892. continue
  1893. }
  1894. if len(foundVersion) == 0 {
  1895. spotFeedVersion := rec[0]
  1896. klog.V(4).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  1897. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1898. if matches != nil {
  1899. foundVersion = matches[1]
  1900. if foundVersion != supportedSpotFeedVersion {
  1901. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1902. break
  1903. }
  1904. }
  1905. continue
  1906. } else if strings.Index(rec[0], "#") == 0 {
  1907. continue
  1908. } else {
  1909. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  1910. continue
  1911. }
  1912. } else if err != nil {
  1913. klog.V(2).Infof("Error during spot info decode: %+v", err)
  1914. continue
  1915. }
  1916. log.DedupedInfof(5, "Found spot info for: %s", spot.InstanceID)
  1917. spots[spot.InstanceID] = &spot
  1918. }
  1919. gr.Close()
  1920. }
  1921. return spots, nil
  1922. }
  1923. func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  1924. /*
  1925. numReserved := len(a.ReservedInstances)
  1926. // Early return if no reserved instance data loaded
  1927. if numReserved == 0 {
  1928. klog.V(4).Infof("[Reserved] No Reserved Instances")
  1929. return
  1930. }
  1931. cfg, err := a.GetConfig()
  1932. defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
  1933. if err != nil {
  1934. klog.V(3).Infof("Could not parse default cpu price")
  1935. defaultCPU = 0.031611
  1936. }
  1937. defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
  1938. if err != nil {
  1939. klog.V(3).Infof("Could not parse default ram price")
  1940. defaultRAM = 0.004237
  1941. }
  1942. cpuToRAMRatio := defaultCPU / defaultRAM
  1943. now := time.Now()
  1944. instances := make(map[string][]*AWSReservedInstance)
  1945. for _, r := range a.ReservedInstances {
  1946. if now.Before(r.StartDate) || now.After(r.EndDate) {
  1947. klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
  1948. continue
  1949. }
  1950. _, ok := instances[r.Region]
  1951. if !ok {
  1952. instances[r.Region] = []*AWSReservedInstance{r}
  1953. } else {
  1954. instances[r.Region] = append(instances[r.Region], r)
  1955. }
  1956. }
  1957. awsNodes := make(map[string]*v1.Node)
  1958. currentNodes := a.Clientset.GetAllNodes()
  1959. // Create a node name -> node map
  1960. for _, awsNode := range currentNodes {
  1961. awsNodes[awsNode.GetName()] = awsNode
  1962. }
  1963. // go through all provider nodes using k8s nodes for region
  1964. for nodeName, node := range nodes {
  1965. // Reset reserved allocation to prevent double allocation
  1966. node.Reserved = nil
  1967. kNode, ok := awsNodes[nodeName]
  1968. if !ok {
  1969. klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
  1970. continue
  1971. }
  1972. nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
  1973. if !ok {
  1974. klog.V(1).Infof("[Reserved] Could not find node region")
  1975. continue
  1976. }
  1977. reservedInstances, ok := instances[nodeRegion]
  1978. if !ok {
  1979. klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
  1980. continue
  1981. }
  1982. // Determine the InstanceType of the node
  1983. instanceType, ok := kNode.Labels["beta.kubernetes.io/instance-type"]
  1984. if !ok {
  1985. continue
  1986. }
  1987. ramBytes, err := strconv.ParseFloat(node.RAMBytes, 64)
  1988. if err != nil {
  1989. continue
  1990. }
  1991. ramGB := ramBytes / 1024 / 1024 / 1024
  1992. cpu, err := strconv.ParseFloat(node.VCPU, 64)
  1993. if err != nil {
  1994. continue
  1995. }
  1996. ramMultiple := cpu*cpuToRAMRatio + ramGB
  1997. node.Reserved = &ReservedInstanceData{
  1998. ReservedCPU: 0,
  1999. ReservedRAM: 0,
  2000. }
  2001. for i, reservedInstance := range reservedInstances {
  2002. if reservedInstance.InstanceType == instanceType {
  2003. // Use < 0 to mark as ALL
  2004. node.Reserved.ReservedCPU = -1
  2005. node.Reserved.ReservedRAM = -1
  2006. // Set Costs based on CPU/RAM ratios
  2007. ramPrice := reservedInstance.PricePerHour / ramMultiple
  2008. node.Reserved.CPUCost = ramPrice * cpuToRAMRatio
  2009. node.Reserved.RAMCost = ramPrice
  2010. // Remove the reserve from the temporary slice to prevent
  2011. // being reallocated
  2012. instances[nodeRegion] = append(reservedInstances[:i], reservedInstances[i+1:]...)
  2013. break
  2014. }
  2015. }
  2016. }*/
  2017. }
  2018. type AWSReservedInstance struct {
  2019. Zone string
  2020. Region string
  2021. InstanceType string
  2022. InstanceCount int64
  2023. InstanceTenacy string
  2024. StartDate time.Time
  2025. EndDate time.Time
  2026. PricePerHour float64
  2027. }
  2028. func (ari *AWSReservedInstance) String() string {
  2029. return fmt.Sprintf("[Zone: %s, Region: %s, Type: %s, Count: %d, Tenacy: %s, Start: %+v, End: %+v, Price: %f]", ari.Zone, ari.Region, ari.InstanceType, ari.InstanceCount, ari.InstanceTenacy, ari.StartDate, ari.EndDate, ari.PricePerHour)
  2030. }
  2031. func isReservedInstanceHourlyPrice(rc *ec2.RecurringCharge) bool {
  2032. return rc != nil && rc.Frequency != nil && *rc.Frequency == "Hourly"
  2033. }
  2034. func getReservedInstancePrice(ri *ec2.ReservedInstances) (float64, error) {
  2035. var pricePerHour float64
  2036. if len(ri.RecurringCharges) > 0 {
  2037. for _, rc := range ri.RecurringCharges {
  2038. if isReservedInstanceHourlyPrice(rc) {
  2039. pricePerHour = *rc.Amount
  2040. break
  2041. }
  2042. }
  2043. }
  2044. // If we're still unable to resolve hourly price, try fixed -> hourly
  2045. if pricePerHour == 0 {
  2046. if ri.Duration != nil && ri.FixedPrice != nil {
  2047. var durHours float64
  2048. durSeconds := float64(*ri.Duration)
  2049. fixedPrice := float64(*ri.FixedPrice)
  2050. if durSeconds != 0 && fixedPrice != 0 {
  2051. durHours = durSeconds / 60 / 60
  2052. pricePerHour = fixedPrice / durHours
  2053. }
  2054. }
  2055. }
  2056. if pricePerHour == 0 {
  2057. return 0, fmt.Errorf("Failed to resolve an hourly price from FixedPrice or Recurring Costs")
  2058. }
  2059. return pricePerHour, nil
  2060. }
  2061. func getRegionReservedInstances(region string) ([]*AWSReservedInstance, error) {
  2062. c := &aws.Config{
  2063. Region: aws.String(region),
  2064. }
  2065. s := session.Must(session.NewSession(c))
  2066. svc := ec2.New(s)
  2067. response, err := svc.DescribeReservedInstances(&ec2.DescribeReservedInstancesInput{})
  2068. if err != nil {
  2069. return nil, err
  2070. }
  2071. var reservedInstances []*AWSReservedInstance
  2072. for _, ri := range response.ReservedInstances {
  2073. var zone string
  2074. if ri.AvailabilityZone != nil {
  2075. zone = *ri.AvailabilityZone
  2076. }
  2077. pricePerHour, err := getReservedInstancePrice(ri)
  2078. if err != nil {
  2079. klog.V(1).Infof("Error Resolving Price: %s", err.Error())
  2080. continue
  2081. }
  2082. reservedInstances = append(reservedInstances, &AWSReservedInstance{
  2083. Zone: zone,
  2084. Region: region,
  2085. InstanceType: *ri.InstanceType,
  2086. InstanceCount: *ri.InstanceCount,
  2087. InstanceTenacy: *ri.InstanceTenancy,
  2088. StartDate: *ri.Start,
  2089. EndDate: *ri.End,
  2090. PricePerHour: pricePerHour,
  2091. })
  2092. }
  2093. return reservedInstances, nil
  2094. }
  2095. func (a *AWS) getReservedInstances() ([]*AWSReservedInstance, error) {
  2096. err := a.configureAWSAuth()
  2097. if err != nil {
  2098. return nil, fmt.Errorf("Error Configuring aws auth: %s", err.Error())
  2099. }
  2100. var reservedInstances []*AWSReservedInstance
  2101. nodes := a.Clientset.GetAllNodes()
  2102. regionsSeen := make(map[string]bool)
  2103. for _, node := range nodes {
  2104. region, ok := node.Labels[v1.LabelZoneRegion]
  2105. if !ok {
  2106. continue
  2107. }
  2108. if regionsSeen[region] {
  2109. continue
  2110. }
  2111. ris, err := getRegionReservedInstances(region)
  2112. if err != nil {
  2113. klog.V(3).Infof("Error getting reserved instances: %s", err.Error())
  2114. continue
  2115. }
  2116. regionsSeen[region] = true
  2117. reservedInstances = append(reservedInstances, ris...)
  2118. }
  2119. return reservedInstances, nil
  2120. }
  2121. func (a *AWS) ServiceAccountStatus() *ServiceAccountStatus {
  2122. checks := []*ServiceAccountCheck{}
  2123. for _, v := range a.ServiceAccountChecks {
  2124. checks = append(checks, v)
  2125. }
  2126. return &ServiceAccountStatus{
  2127. Checks: checks,
  2128. }
  2129. }