awsprovider.go 63 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "log"
  11. "net/http"
  12. "os"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. "k8s.io/klog"
  19. "github.com/kubecost/cost-model/pkg/clustercache"
  20. "github.com/kubecost/cost-model/pkg/errors"
  21. "github.com/kubecost/cost-model/pkg/util"
  22. "github.com/aws/aws-sdk-go/aws"
  23. "github.com/aws/aws-sdk-go/aws/awserr"
  24. "github.com/aws/aws-sdk-go/aws/credentials"
  25. "github.com/aws/aws-sdk-go/aws/session"
  26. "github.com/aws/aws-sdk-go/service/athena"
  27. "github.com/aws/aws-sdk-go/service/ec2"
  28. "github.com/aws/aws-sdk-go/service/s3"
  29. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  30. "github.com/jszwec/csvutil"
  31. v1 "k8s.io/api/core/v1"
  32. )
  33. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  34. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  35. const awsReservedInstancePricePerHour = 0.0287
  36. const supportedSpotFeedVersion = "1"
  37. const SpotInfoUpdateType = "spotinfo"
  38. const AthenaInfoUpdateType = "athenainfo"
  39. const defaultConfigPath = "/var/configs/"
  40. var awsRegions = []string{
  41. "us-east-2",
  42. "us-east-1",
  43. "us-west-1",
  44. "us-west-2",
  45. "ap-east-1",
  46. "ap-south-1",
  47. "ap-northeast-3",
  48. "ap-northeast-2",
  49. "ap-southeast-1",
  50. "ap-southeast-2",
  51. "ap-northeast-1",
  52. "ca-central-1",
  53. "cn-north-1",
  54. "cn-northwest-1",
  55. "eu-central-1",
  56. "eu-west-1",
  57. "eu-west-2",
  58. "eu-west-3",
  59. "eu-north-1",
  60. "me-south-1",
  61. "sa-east-1",
  62. "us-gov-east-1",
  63. "us-gov-west-1",
  64. }
  65. // AWS represents an Amazon Provider
  66. type AWS struct {
  67. Pricing map[string]*AWSProductTerms
  68. SpotPricingByInstanceID map[string]*spotInfo
  69. SpotPricingUpdatedAt *time.Time
  70. SpotRefreshRunning bool
  71. SpotPricingLock sync.RWMutex
  72. RIPricingByInstanceID map[string]*RIData
  73. RIDataRunning bool
  74. RIDataLock sync.RWMutex
  75. ValidPricingKeys map[string]bool
  76. Clientset clustercache.ClusterCache
  77. BaseCPUPrice string
  78. BaseRAMPrice string
  79. BaseGPUPrice string
  80. BaseSpotCPUPrice string
  81. BaseSpotRAMPrice string
  82. SpotLabelName string
  83. SpotLabelValue string
  84. ServiceKeyName string
  85. ServiceKeySecret string
  86. SpotDataRegion string
  87. SpotDataBucket string
  88. SpotDataPrefix string
  89. ProjectID string
  90. DownloadPricingDataLock sync.RWMutex
  91. Config *ProviderConfig
  92. *CustomProvider
  93. }
  94. type AWSAccessKey struct {
  95. AccessKeyID string `json:"aws_access_key_id"`
  96. SecretAccessKey string `json:"aws_secret_access_key"`
  97. }
  98. // AWSPricing maps a k8s node to an AWS Pricing "product"
  99. type AWSPricing struct {
  100. Products map[string]*AWSProduct `json:"products"`
  101. Terms AWSPricingTerms `json:"terms"`
  102. }
  103. // AWSProduct represents a purchased SKU
  104. type AWSProduct struct {
  105. Sku string `json:"sku"`
  106. Attributes AWSProductAttributes `json:"attributes"`
  107. }
  108. // AWSProductAttributes represents metadata about the product used to map to a node.
  109. type AWSProductAttributes struct {
  110. Location string `json:"location"`
  111. InstanceType string `json:"instanceType"`
  112. Memory string `json:"memory"`
  113. Storage string `json:"storage"`
  114. VCpu string `json:"vcpu"`
  115. UsageType string `json:"usagetype"`
  116. OperatingSystem string `json:"operatingSystem"`
  117. PreInstalledSw string `json:"preInstalledSw"`
  118. InstanceFamily string `json:"instanceFamily"`
  119. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  120. }
  121. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  122. type AWSPricingTerms struct {
  123. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  124. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  125. }
  126. // AWSOfferTerm is a sku extension used to pay for the node.
  127. type AWSOfferTerm struct {
  128. Sku string `json:"sku"`
  129. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  130. }
  131. // AWSRateCode encodes data about the price of a product
  132. type AWSRateCode struct {
  133. Unit string `json:"unit"`
  134. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  135. }
  136. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  137. type AWSCurrencyCode struct {
  138. USD string `json:"USD"`
  139. }
  140. // AWSProductTerms represents the full terms of the product
  141. type AWSProductTerms struct {
  142. Sku string `json:"sku"`
  143. OnDemand *AWSOfferTerm `json:"OnDemand"`
  144. Reserved *AWSOfferTerm `json:"Reserved"`
  145. Memory string `json:"memory"`
  146. Storage string `json:"storage"`
  147. VCpu string `json:"vcpu"`
  148. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  149. PV *PV `json:"pv"`
  150. }
  151. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  152. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  153. // OnDemandRateCode is appended to an node sku
  154. const OnDemandRateCode = ".JRTCKXETXF"
  155. // ReservedRateCode is appended to a node sku
  156. const ReservedRateCode = ".38NPMPTW36"
  157. // HourlyRateCode is appended to a node sku
  158. const HourlyRateCode = ".6YS6EN2CT7"
  159. // volTypes are used to map between AWS UsageTypes and
  160. // EBS volume types, as they would appear in K8s storage class
  161. // name and the EC2 API.
  162. var volTypes = map[string]string{
  163. "EBS:VolumeUsage.gp2": "gp2",
  164. "EBS:VolumeUsage": "standard",
  165. "EBS:VolumeUsage.sc1": "sc1",
  166. "EBS:VolumeP-IOPS.piops": "io1",
  167. "EBS:VolumeUsage.st1": "st1",
  168. "EBS:VolumeUsage.piops": "io1",
  169. "gp2": "EBS:VolumeUsage.gp2",
  170. "standard": "EBS:VolumeUsage",
  171. "sc1": "EBS:VolumeUsage.sc1",
  172. "io1": "EBS:VolumeUsage.piops",
  173. "st1": "EBS:VolumeUsage.st1",
  174. }
  175. // locationToRegion maps AWS region names (As they come from Billing)
  176. // to actual region identifiers
  177. var locationToRegion = map[string]string{
  178. "US East (Ohio)": "us-east-2",
  179. "US East (N. Virginia)": "us-east-1",
  180. "US West (N. California)": "us-west-1",
  181. "US West (Oregon)": "us-west-2",
  182. "Asia Pacific (Hong Kong)": "ap-east-1",
  183. "Asia Pacific (Mumbai)": "ap-south-1",
  184. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  185. "Asia Pacific (Seoul)": "ap-northeast-2",
  186. "Asia Pacific (Singapore)": "ap-southeast-1",
  187. "Asia Pacific (Sydney)": "ap-southeast-2",
  188. "Asia Pacific (Tokyo)": "ap-northeast-1",
  189. "Canada (Central)": "ca-central-1",
  190. "China (Beijing)": "cn-north-1",
  191. "China (Ningxia)": "cn-northwest-1",
  192. "EU (Frankfurt)": "eu-central-1",
  193. "EU (Ireland)": "eu-west-1",
  194. "EU (London)": "eu-west-2",
  195. "EU (Paris)": "eu-west-3",
  196. "EU (Stockholm)": "eu-north-1",
  197. "South America (Sao Paulo)": "sa-east-1",
  198. "AWS GovCloud (US-East)": "us-gov-east-1",
  199. "AWS GovCloud (US)": "us-gov-west-1",
  200. }
  201. var regionToBillingRegionCode = map[string]string{
  202. "us-east-2": "USE2",
  203. "us-east-1": "",
  204. "us-west-1": "USW1",
  205. "us-west-2": "USW2",
  206. "ap-east-1": "APE1",
  207. "ap-south-1": "APS3",
  208. "ap-northeast-3": "APN3",
  209. "ap-northeast-2": "APN2",
  210. "ap-southeast-1": "APS1",
  211. "ap-southeast-2": "APS2",
  212. "ap-northeast-1": "APN1",
  213. "ca-central-1": "CAN1",
  214. "cn-north-1": "",
  215. "cn-northwest-1": "",
  216. "eu-central-1": "EUC1",
  217. "eu-west-1": "EU",
  218. "eu-west-2": "EUW2",
  219. "eu-west-3": "EUW3",
  220. "eu-north-1": "EUN1",
  221. "sa-east-1": "SAE1",
  222. "us-gov-east-1": "UGE1",
  223. "us-gov-west-1": "UGW1",
  224. }
  225. var loadedAWSSecret bool = false
  226. var awsSecret *AWSAccessKey = nil
  227. func (aws *AWS) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
  228. return ""
  229. }
  230. // KubeAttrConversion maps the k8s labels for region to an aws region
  231. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  232. operatingSystem = strings.ToLower(operatingSystem)
  233. region := locationToRegion[location]
  234. return region + "," + instanceType + "," + operatingSystem
  235. }
  236. type AwsSpotFeedInfo struct {
  237. BucketName string `json:"bucketName"`
  238. Prefix string `json:"prefix"`
  239. Region string `json:"region"`
  240. AccountID string `json:"projectID"`
  241. ServiceKeyName string `json:"serviceKeyName"`
  242. ServiceKeySecret string `json:"serviceKeySecret"`
  243. SpotLabel string `json:"spotLabel"`
  244. SpotLabelValue string `json:"spotLabelValue"`
  245. }
  246. type AwsAthenaInfo struct {
  247. AthenaBucketName string `json:"athenaBucketName"`
  248. AthenaRegion string `json:"athenaRegion"`
  249. AthenaDatabase string `json:"athenaDatabase"`
  250. AthenaTable string `json:"athenaTable"`
  251. ServiceKeyName string `json:"serviceKeyName"`
  252. ServiceKeySecret string `json:"serviceKeySecret"`
  253. AccountID string `json:"projectID"`
  254. }
  255. func (aws *AWS) GetManagementPlatform() (string, error) {
  256. nodes := aws.Clientset.GetAllNodes()
  257. if len(nodes) > 0 {
  258. n := nodes[0]
  259. version := n.Status.NodeInfo.KubeletVersion
  260. if strings.Contains(version, "eks") {
  261. return "eks", nil
  262. }
  263. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  264. return "kops", nil
  265. }
  266. }
  267. return "", nil
  268. }
  269. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  270. c, err := aws.Config.GetCustomPricingData()
  271. if c.Discount == "" {
  272. c.Discount = "0%"
  273. }
  274. if c.NegotiatedDiscount == "" {
  275. c.NegotiatedDiscount = "0%"
  276. }
  277. if err != nil {
  278. return nil, err
  279. }
  280. return c, nil
  281. }
  282. func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
  283. return aws.Config.UpdateFromMap(a)
  284. }
  285. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  286. return aws.Config.Update(func(c *CustomPricing) error {
  287. if updateType == SpotInfoUpdateType {
  288. a := AwsSpotFeedInfo{}
  289. err := json.NewDecoder(r).Decode(&a)
  290. if err != nil {
  291. return err
  292. }
  293. c.ServiceKeyName = a.ServiceKeyName
  294. if a.ServiceKeySecret != "" {
  295. c.ServiceKeySecret = a.ServiceKeySecret
  296. }
  297. c.SpotDataPrefix = a.Prefix
  298. c.SpotDataBucket = a.BucketName
  299. c.ProjectID = a.AccountID
  300. c.SpotDataRegion = a.Region
  301. c.SpotLabel = a.SpotLabel
  302. c.SpotLabelValue = a.SpotLabelValue
  303. } else if updateType == AthenaInfoUpdateType {
  304. a := AwsAthenaInfo{}
  305. err := json.NewDecoder(r).Decode(&a)
  306. if err != nil {
  307. return err
  308. }
  309. c.AthenaBucketName = a.AthenaBucketName
  310. c.AthenaRegion = a.AthenaRegion
  311. c.AthenaDatabase = a.AthenaDatabase
  312. c.AthenaTable = a.AthenaTable
  313. c.ServiceKeyName = a.ServiceKeyName
  314. if a.ServiceKeySecret != "" {
  315. c.ServiceKeySecret = a.ServiceKeySecret
  316. }
  317. c.AthenaProjectID = a.AccountID
  318. } else {
  319. a := make(map[string]interface{})
  320. err := json.NewDecoder(r).Decode(&a)
  321. if err != nil {
  322. return err
  323. }
  324. for k, v := range a {
  325. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  326. vstr, ok := v.(string)
  327. if ok {
  328. err := SetCustomPricingField(c, kUpper, vstr)
  329. if err != nil {
  330. return err
  331. }
  332. } else {
  333. sci := v.(map[string]interface{})
  334. sc := make(map[string]string)
  335. for k, val := range sci {
  336. sc[k] = val.(string)
  337. }
  338. c.SharedCosts = sc //todo: support reflection/multiple map fields
  339. }
  340. }
  341. }
  342. remoteEnabled := os.Getenv(remoteEnabled)
  343. if remoteEnabled == "true" {
  344. err := UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
  345. if err != nil {
  346. return err
  347. }
  348. }
  349. return nil
  350. })
  351. }
  352. type awsKey struct {
  353. SpotLabelName string
  354. SpotLabelValue string
  355. Labels map[string]string
  356. ProviderID string
  357. }
  358. func (k *awsKey) GPUType() string {
  359. return ""
  360. }
  361. func (k *awsKey) ID() string {
  362. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  363. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  364. if matchNum == 2 {
  365. return group
  366. }
  367. }
  368. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  369. return ""
  370. }
  371. func (k *awsKey) Features() string {
  372. instanceType := k.Labels[v1.LabelInstanceType]
  373. var operatingSystem string
  374. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  375. if !ok {
  376. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  377. }
  378. region := k.Labels[v1.LabelZoneRegion]
  379. key := region + "," + instanceType + "," + operatingSystem
  380. usageType := "preemptible"
  381. spotKey := key + "," + usageType
  382. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  383. return spotKey
  384. }
  385. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  386. return spotKey
  387. }
  388. return key
  389. }
  390. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  391. pricing, ok := aws.Pricing[pvk.Features()]
  392. if !ok {
  393. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  394. return &PV{}, nil
  395. }
  396. return pricing.PV, nil
  397. }
  398. type awsPVKey struct {
  399. Labels map[string]string
  400. StorageClassParameters map[string]string
  401. StorageClassName string
  402. Name string
  403. DefaultRegion string
  404. }
  405. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  406. return &awsPVKey{
  407. Labels: pv.Labels,
  408. StorageClassName: pv.Spec.StorageClassName,
  409. StorageClassParameters: parameters,
  410. Name: pv.Name,
  411. DefaultRegion: defaultRegion,
  412. }
  413. }
  414. func (key *awsPVKey) GetStorageClass() string {
  415. return key.StorageClassName
  416. }
  417. func (key *awsPVKey) Features() string {
  418. storageClass := key.StorageClassParameters["type"]
  419. if storageClass == "standard" {
  420. storageClass = "gp2"
  421. }
  422. // Storage class names are generally EBS volume types (gp2)
  423. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  424. // Converts between the 2
  425. region := key.Labels[v1.LabelZoneRegion]
  426. //if region == "" {
  427. // region = "us-east-1"
  428. //}
  429. class, ok := volTypes[storageClass]
  430. if !ok {
  431. klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  432. }
  433. return region + "," + class
  434. }
  435. // GetKey maps node labels to information needed to retrieve pricing data
  436. func (aws *AWS) GetKey(labels map[string]string, n *v1.Node) Key {
  437. return &awsKey{
  438. SpotLabelName: aws.SpotLabelName,
  439. SpotLabelValue: aws.SpotLabelValue,
  440. Labels: labels,
  441. ProviderID: labels["providerID"],
  442. }
  443. }
  444. func (aws *AWS) isPreemptible(key string) bool {
  445. s := strings.Split(key, ",")
  446. if len(s) == 4 && s[3] == "preemptible" {
  447. return true
  448. }
  449. return false
  450. }
  451. // DownloadPricingData fetches data from the AWS Pricing API
  452. func (aws *AWS) DownloadPricingData() error {
  453. aws.DownloadPricingDataLock.Lock()
  454. defer aws.DownloadPricingDataLock.Unlock()
  455. c, err := aws.Config.GetCustomPricingData()
  456. if err != nil {
  457. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  458. }
  459. aws.BaseCPUPrice = c.CPU
  460. aws.BaseRAMPrice = c.RAM
  461. aws.BaseGPUPrice = c.GPU
  462. aws.BaseSpotCPUPrice = c.SpotCPU
  463. aws.BaseSpotRAMPrice = c.SpotRAM
  464. aws.SpotLabelName = c.SpotLabel
  465. aws.SpotLabelValue = c.SpotLabelValue
  466. aws.SpotDataBucket = c.SpotDataBucket
  467. aws.SpotDataPrefix = c.SpotDataPrefix
  468. aws.ProjectID = c.ProjectID
  469. aws.SpotDataRegion = c.SpotDataRegion
  470. skn, sks := aws.getAWSAuth(false, c)
  471. aws.ServiceKeyName = skn
  472. aws.ServiceKeySecret = sks
  473. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  474. klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  475. }
  476. nodeList := aws.Clientset.GetAllNodes()
  477. inputkeys := make(map[string]bool)
  478. for _, n := range nodeList {
  479. labels := n.GetObjectMeta().GetLabels()
  480. key := aws.GetKey(labels, n)
  481. inputkeys[key.Features()] = true
  482. }
  483. pvList := aws.Clientset.GetAllPersistentVolumes()
  484. storageClasses := aws.Clientset.GetAllStorageClasses()
  485. storageClassMap := make(map[string]map[string]string)
  486. for _, storageClass := range storageClasses {
  487. params := storageClass.Parameters
  488. storageClassMap[storageClass.ObjectMeta.Name] = params
  489. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  490. storageClassMap["default"] = params
  491. storageClassMap[""] = params
  492. }
  493. }
  494. pvkeys := make(map[string]PVKey)
  495. for _, pv := range pvList {
  496. params, ok := storageClassMap[pv.Spec.StorageClassName]
  497. if !ok {
  498. klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  499. continue
  500. }
  501. key := aws.GetPVKey(pv, params, "")
  502. pvkeys[key.Features()] = key
  503. }
  504. // RIDataRunning establishes the existance of the goroutine. Since it's possible we
  505. // run multiple downloads, we don't want to create multiple go routines if one already exists
  506. if !aws.RIDataRunning && c.AthenaBucketName != "" {
  507. err = aws.GetReservationDataFromAthena() // Block until one run has completed.
  508. if err != nil {
  509. klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
  510. } else { // If we make one successful run, check on new reservation data every hour
  511. go func() {
  512. defer errors.HandlePanic()
  513. aws.RIDataRunning = true
  514. for {
  515. klog.Infof("Reserved Instance watcher running... next update in 1h")
  516. time.Sleep(time.Hour)
  517. err := aws.GetReservationDataFromAthena()
  518. if err != nil {
  519. klog.Infof("Error updating RI data: %s", err.Error())
  520. }
  521. }
  522. }()
  523. }
  524. }
  525. aws.Pricing = make(map[string]*AWSProductTerms)
  526. aws.ValidPricingKeys = make(map[string]bool)
  527. skusToKeys := make(map[string]string)
  528. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  529. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  530. resp, err := http.Get(pricingURL)
  531. if err != nil {
  532. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  533. return err
  534. }
  535. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  536. dec := json.NewDecoder(resp.Body)
  537. for {
  538. t, err := dec.Token()
  539. if err == io.EOF {
  540. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  541. break
  542. }
  543. if t == "products" {
  544. _, err := dec.Token() // this should parse the opening "{""
  545. if err != nil {
  546. return err
  547. }
  548. for dec.More() {
  549. _, err := dec.Token() // the sku token
  550. if err != nil {
  551. return err
  552. }
  553. product := &AWSProduct{}
  554. err = dec.Decode(&product)
  555. if err != nil {
  556. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  557. break
  558. }
  559. if product.Attributes.PreInstalledSw == "NA" &&
  560. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  561. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  562. spotKey := key + ",preemptible"
  563. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  564. productTerms := &AWSProductTerms{
  565. Sku: product.Sku,
  566. Memory: product.Attributes.Memory,
  567. Storage: product.Attributes.Storage,
  568. VCpu: product.Attributes.VCpu,
  569. GPU: product.Attributes.GPU,
  570. }
  571. aws.Pricing[key] = productTerms
  572. aws.Pricing[spotKey] = productTerms
  573. skusToKeys[product.Sku] = key
  574. }
  575. aws.ValidPricingKeys[key] = true
  576. aws.ValidPricingKeys[spotKey] = true
  577. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  578. // UsageTypes may be prefixed with a region code - we're removing this when using
  579. // volTypes to keep lookups generic
  580. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  581. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  582. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  583. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  584. spotKey := key + ",preemptible"
  585. pv := &PV{
  586. Class: volTypes[usageTypeNoRegion],
  587. Region: locationToRegion[product.Attributes.Location],
  588. }
  589. productTerms := &AWSProductTerms{
  590. Sku: product.Sku,
  591. PV: pv,
  592. }
  593. aws.Pricing[key] = productTerms
  594. aws.Pricing[spotKey] = productTerms
  595. skusToKeys[product.Sku] = key
  596. aws.ValidPricingKeys[key] = true
  597. aws.ValidPricingKeys[spotKey] = true
  598. }
  599. }
  600. }
  601. if t == "terms" {
  602. _, err := dec.Token() // this should parse the opening "{""
  603. if err != nil {
  604. return err
  605. }
  606. termType, err := dec.Token()
  607. if err != nil {
  608. return err
  609. }
  610. if termType == "OnDemand" {
  611. _, err := dec.Token()
  612. if err != nil { // again, should parse an opening "{"
  613. return err
  614. }
  615. for dec.More() {
  616. sku, err := dec.Token()
  617. if err != nil {
  618. return err
  619. }
  620. _, err = dec.Token() // another opening "{"
  621. if err != nil {
  622. return err
  623. }
  624. skuOnDemand, err := dec.Token()
  625. if err != nil {
  626. return err
  627. }
  628. offerTerm := &AWSOfferTerm{}
  629. err = dec.Decode(&offerTerm)
  630. if err != nil {
  631. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  632. }
  633. if sku.(string)+OnDemandRateCode == skuOnDemand {
  634. key, ok := skusToKeys[sku.(string)]
  635. spotKey := key + ",preemptible"
  636. if ok {
  637. aws.Pricing[key].OnDemand = offerTerm
  638. aws.Pricing[spotKey].OnDemand = offerTerm
  639. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  640. // If the specific UsageType is the per IO cost used on io1 volumes
  641. // we need to add the per IO cost to the io1 PV cost
  642. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  643. // Add the per IO cost to the PV object for the io1 volume type
  644. aws.Pricing[key].PV.CostPerIO = cost
  645. } else if strings.Contains(key, "EBS:Volume") {
  646. // If volume, we need to get hourly cost and add it to the PV object
  647. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  648. costFloat, _ := strconv.ParseFloat(cost, 64)
  649. hourlyPrice := costFloat / 730
  650. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  651. }
  652. }
  653. }
  654. _, err = dec.Token()
  655. if err != nil {
  656. return err
  657. }
  658. }
  659. _, err = dec.Token()
  660. if err != nil {
  661. return err
  662. }
  663. }
  664. }
  665. }
  666. // Always run spot pricing refresh when performing download
  667. aws.refreshSpotPricing(true)
  668. // Only start a single refresh goroutine
  669. if !aws.SpotRefreshRunning {
  670. aws.SpotRefreshRunning = true
  671. go func() {
  672. defer errors.HandlePanic()
  673. for {
  674. klog.Infof("Spot Pricing Refresh scheduled in 1 hr.")
  675. time.Sleep(time.Hour)
  676. // Reoccurring refresh checks update times
  677. aws.refreshSpotPricing(false)
  678. }
  679. }()
  680. }
  681. return nil
  682. }
  683. func (aws *AWS) refreshSpotPricing(force bool) {
  684. aws.SpotPricingLock.Lock()
  685. defer aws.SpotPricingLock.Unlock()
  686. now := time.Now().UTC()
  687. updateTime := now.Add(-time.Hour)
  688. // Return if there was an update time set and an hour hasn't elapsed
  689. if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
  690. return
  691. }
  692. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  693. if err != nil {
  694. klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
  695. return
  696. }
  697. // update time last updated
  698. aws.SpotPricingUpdatedAt = &now
  699. aws.SpotPricingByInstanceID = sp
  700. }
  701. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  702. func (aws *AWS) NetworkPricing() (*Network, error) {
  703. cpricing, err := aws.Config.GetCustomPricingData()
  704. if err != nil {
  705. return nil, err
  706. }
  707. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  708. if err != nil {
  709. return nil, err
  710. }
  711. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  712. if err != nil {
  713. return nil, err
  714. }
  715. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  716. if err != nil {
  717. return nil, err
  718. }
  719. return &Network{
  720. ZoneNetworkEgressCost: znec,
  721. RegionNetworkEgressCost: rnec,
  722. InternetNetworkEgressCost: inec,
  723. }, nil
  724. }
  725. // AllNodePricing returns all the billing data fetched.
  726. func (aws *AWS) AllNodePricing() (interface{}, error) {
  727. aws.DownloadPricingDataLock.RLock()
  728. defer aws.DownloadPricingDataLock.RUnlock()
  729. return aws.Pricing, nil
  730. }
  731. func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
  732. aws.SpotPricingLock.RLock()
  733. defer aws.SpotPricingLock.RUnlock()
  734. info, ok := aws.SpotPricingByInstanceID[instanceID]
  735. return info, ok
  736. }
  737. func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
  738. aws.RIDataLock.RLock()
  739. defer aws.RIDataLock.RUnlock()
  740. data, ok := aws.RIPricingByInstanceID[instanceID]
  741. return data, ok
  742. }
  743. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  744. key := k.Features()
  745. if spotInfo, ok := aws.spotPricing(k.ID()); ok {
  746. var spotcost string
  747. klog.V(3).Infof("Looking up spot data from feed for node %s", k.ID())
  748. arr := strings.Split(spotInfo.Charge, " ")
  749. if len(arr) == 2 {
  750. spotcost = arr[0]
  751. } else {
  752. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  753. }
  754. return &Node{
  755. Cost: spotcost,
  756. VCPU: terms.VCpu,
  757. RAM: terms.Memory,
  758. GPU: terms.GPU,
  759. Storage: terms.Storage,
  760. BaseCPUPrice: aws.BaseCPUPrice,
  761. BaseRAMPrice: aws.BaseRAMPrice,
  762. BaseGPUPrice: aws.BaseGPUPrice,
  763. UsageType: usageType,
  764. }, nil
  765. } else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
  766. klog.Infof("Node %s marked preemitible but we have no data in spot feed", k.ID())
  767. return &Node{
  768. VCPU: terms.VCpu,
  769. VCPUCost: aws.BaseSpotCPUPrice,
  770. RAM: terms.Memory,
  771. GPU: terms.GPU,
  772. RAMCost: aws.BaseSpotRAMPrice,
  773. Storage: terms.Storage,
  774. BaseCPUPrice: aws.BaseCPUPrice,
  775. BaseRAMPrice: aws.BaseRAMPrice,
  776. BaseGPUPrice: aws.BaseGPUPrice,
  777. UsageType: usageType,
  778. }, nil
  779. } else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
  780. strCost := fmt.Sprintf("%f", ri.EffectiveCost)
  781. return &Node{
  782. Cost: strCost,
  783. VCPU: terms.VCpu,
  784. RAM: terms.Memory,
  785. GPU: terms.GPU,
  786. Storage: terms.Storage,
  787. BaseCPUPrice: aws.BaseCPUPrice,
  788. BaseRAMPrice: aws.BaseRAMPrice,
  789. BaseGPUPrice: aws.BaseGPUPrice,
  790. UsageType: usageType,
  791. }, nil
  792. }
  793. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  794. if !ok {
  795. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  796. }
  797. cost := c.PricePerUnit.USD
  798. return &Node{
  799. Cost: cost,
  800. VCPU: terms.VCpu,
  801. RAM: terms.Memory,
  802. GPU: terms.GPU,
  803. Storage: terms.Storage,
  804. BaseCPUPrice: aws.BaseCPUPrice,
  805. BaseRAMPrice: aws.BaseRAMPrice,
  806. BaseGPUPrice: aws.BaseGPUPrice,
  807. UsageType: usageType,
  808. }, nil
  809. }
  810. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  811. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  812. aws.DownloadPricingDataLock.RLock()
  813. defer aws.DownloadPricingDataLock.RUnlock()
  814. key := k.Features()
  815. usageType := "ondemand"
  816. if aws.isPreemptible(key) {
  817. usageType = "preemptible"
  818. }
  819. terms, ok := aws.Pricing[key]
  820. if ok {
  821. return aws.createNode(terms, usageType, k)
  822. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  823. aws.DownloadPricingDataLock.RUnlock()
  824. err := aws.DownloadPricingData()
  825. aws.DownloadPricingDataLock.RLock()
  826. if err != nil {
  827. return &Node{
  828. Cost: aws.BaseCPUPrice,
  829. BaseCPUPrice: aws.BaseCPUPrice,
  830. BaseRAMPrice: aws.BaseRAMPrice,
  831. BaseGPUPrice: aws.BaseGPUPrice,
  832. UsageType: usageType,
  833. UsesBaseCPUPrice: true,
  834. }, err
  835. }
  836. terms, termsOk := aws.Pricing[key]
  837. if !termsOk {
  838. return &Node{
  839. Cost: aws.BaseCPUPrice,
  840. BaseCPUPrice: aws.BaseCPUPrice,
  841. BaseRAMPrice: aws.BaseRAMPrice,
  842. BaseGPUPrice: aws.BaseGPUPrice,
  843. UsageType: usageType,
  844. UsesBaseCPUPrice: true,
  845. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  846. }
  847. return aws.createNode(terms, usageType, k)
  848. } else { // Fall back to base pricing if we can't find the key.
  849. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  850. return &Node{
  851. Cost: aws.BaseCPUPrice,
  852. BaseCPUPrice: aws.BaseCPUPrice,
  853. BaseRAMPrice: aws.BaseRAMPrice,
  854. BaseGPUPrice: aws.BaseGPUPrice,
  855. UsageType: usageType,
  856. UsesBaseCPUPrice: true,
  857. }, nil
  858. }
  859. }
  860. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  861. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  862. defaultClusterName := "AWS Cluster #1"
  863. c, err := awsProvider.GetConfig()
  864. if err != nil {
  865. return nil, err
  866. }
  867. remote := os.Getenv(remoteEnabled)
  868. remoteEnabled := false
  869. if os.Getenv(remote) == "true" {
  870. remoteEnabled = true
  871. }
  872. if c.ClusterName != "" {
  873. m := make(map[string]string)
  874. m["name"] = c.ClusterName
  875. m["provider"] = "AWS"
  876. m["id"] = os.Getenv(clusterIDKey)
  877. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  878. return m, nil
  879. }
  880. makeStructure := func(clusterName string) (map[string]string, error) {
  881. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  882. m := make(map[string]string)
  883. m["name"] = clusterName
  884. m["provider"] = "AWS"
  885. m["id"] = os.Getenv(clusterIDKey)
  886. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  887. return m, nil
  888. }
  889. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  890. if len(maybeClusterId) != 0 {
  891. return makeStructure(maybeClusterId)
  892. }
  893. // TODO: This should be cached, it can take a long time to hit the API
  894. //provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  895. //clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  896. //klog.Infof("nodelist get here %s", time.Now())
  897. //nodeList := awsProvider.Clientset.GetAllNodes()
  898. //klog.Infof("nodelist done here %s", time.Now())
  899. /*for _, n := range nodeList {
  900. region := ""
  901. instanceId := ""
  902. providerId := n.Spec.ProviderID
  903. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  904. if matchNum == 1 {
  905. region = group
  906. } else if matchNum == 2 {
  907. instanceId = group
  908. }
  909. }
  910. if len(instanceId) == 0 {
  911. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  912. continue
  913. }
  914. c := &aws.Config{
  915. Region: aws.String(region),
  916. }
  917. s := session.Must(session.NewSession(c))
  918. ec2Svc := ec2.New(s)
  919. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  920. InstanceIds: []*string{
  921. aws.String(instanceId),
  922. },
  923. })
  924. if diErr != nil {
  925. klog.Infof("Error describing instances: %s", diErr)
  926. continue
  927. }
  928. if len(di.Reservations) != 1 {
  929. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  930. continue
  931. }
  932. res := di.Reservations[0]
  933. if len(res.Instances) != 1 {
  934. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  935. continue
  936. }
  937. inst := res.Instances[0]
  938. for _, tag := range inst.Tags {
  939. tagKey := *tag.Key
  940. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  941. if matchNum != 1 {
  942. continue
  943. }
  944. return makeStructure(group)
  945. }
  946. }
  947. }*/
  948. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  949. return makeStructure(defaultClusterName)
  950. }
  951. // Gets the aws key id and secret
  952. func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
  953. // 1. Check config values first (set from frontend UI)
  954. if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
  955. return cp.ServiceKeyName, cp.ServiceKeySecret
  956. }
  957. // 2. Check for secret
  958. s, _ := aws.loadAWSAuthSecret(forceReload)
  959. if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
  960. return s.AccessKeyID, s.SecretAccessKey
  961. }
  962. // 3. Fall back to env vars
  963. return os.Getenv(awsAccessKeyIDEnvVar), os.Getenv(awsAccessKeySecretEnvVar)
  964. }
  965. // Load once and cache the result (even on failure). This is an install time secret, so
  966. // we don't expect the secret to change. If it does, however, we can force reload using
  967. // the input parameter.
  968. func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
  969. if !force && loadedAWSSecret {
  970. return awsSecret, nil
  971. }
  972. loadedAWSSecret = true
  973. exists, err := util.FileExists(authSecretPath)
  974. if !exists || err != nil {
  975. return nil, fmt.Errorf("Failed to locate service account file: %s", authSecretPath)
  976. }
  977. result, err := ioutil.ReadFile(authSecretPath)
  978. if err != nil {
  979. return nil, err
  980. }
  981. var ak AWSAccessKey
  982. err = json.Unmarshal(result, &ak)
  983. if err != nil {
  984. return nil, err
  985. }
  986. awsSecret = &ak
  987. return awsSecret, nil
  988. }
  989. func (aws *AWS) configureAWSAuth() error {
  990. accessKeyID := aws.ServiceKeyName
  991. accessKeySecret := aws.ServiceKeySecret
  992. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  993. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  994. if err != nil {
  995. return err
  996. }
  997. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  998. if err != nil {
  999. return err
  1000. }
  1001. }
  1002. return nil
  1003. }
  1004. func getClusterConfig(ccFile string) (map[string]string, error) {
  1005. clusterConfig, err := os.Open(ccFile)
  1006. if err != nil {
  1007. return nil, err
  1008. }
  1009. defer clusterConfig.Close()
  1010. b, err := ioutil.ReadAll(clusterConfig)
  1011. if err != nil {
  1012. return nil, err
  1013. }
  1014. var clusterConf map[string]string
  1015. err = json.Unmarshal([]byte(b), &clusterConf)
  1016. if err != nil {
  1017. return nil, err
  1018. }
  1019. return clusterConf, nil
  1020. }
  1021. // SetKeyEnv ensures that the two environment variables necessary to configure
  1022. // a new AWS Session are set.
  1023. func (a *AWS) SetKeyEnv() error {
  1024. // TODO add this to the helm chart, mirroring the cost-model
  1025. // configPath := os.Getenv("CONFIG_PATH")
  1026. configPath := defaultConfigPath
  1027. path := configPath + "aws.json"
  1028. if _, err := os.Stat(path); err != nil {
  1029. if os.IsNotExist(err) {
  1030. log.Printf("error: file %s does not exist", path)
  1031. } else {
  1032. log.Printf("error: %s", err)
  1033. }
  1034. return err
  1035. }
  1036. jsonFile, err := os.Open(path)
  1037. defer jsonFile.Close()
  1038. configMap := map[string]string{}
  1039. configBytes, err := ioutil.ReadAll(jsonFile)
  1040. if err != nil {
  1041. return err
  1042. }
  1043. json.Unmarshal([]byte(configBytes), &configMap)
  1044. keyName := configMap["awsServiceKeyName"]
  1045. keySecret := configMap["awsServiceKeySecret"]
  1046. // These are required before calling NewEnvCredentials below
  1047. os.Setenv("AWS_ACCESS_KEY_ID", keyName)
  1048. os.Setenv("AWS_SECRET_ACCESS_KEY", keySecret)
  1049. return nil
  1050. }
  1051. func (a *AWS) getAddressesForRegion(region string) (*ec2.DescribeAddressesOutput, error) {
  1052. sess, err := session.NewSession(&aws.Config{
  1053. Region: aws.String(region),
  1054. Credentials: credentials.NewEnvCredentials(),
  1055. })
  1056. if err != nil {
  1057. return nil, err
  1058. }
  1059. ec2Svc := ec2.New(sess)
  1060. return ec2Svc.DescribeAddresses(&ec2.DescribeAddressesInput{})
  1061. }
  1062. func (a *AWS) GetAddresses() ([]byte, error) {
  1063. if err := a.SetKeyEnv(); err != nil {
  1064. return nil, err
  1065. }
  1066. addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
  1067. errorCh := make(chan error, len(awsRegions))
  1068. var wg sync.WaitGroup
  1069. wg.Add(len(awsRegions))
  1070. // Get volumes from each AWS region
  1071. for _, r := range awsRegions {
  1072. // Fetch IP address response and send results and errors to their
  1073. // respective channels
  1074. go func(region string) {
  1075. defer wg.Done()
  1076. defer errors.HandlePanic()
  1077. // Query for first page of volume results
  1078. resp, err := a.getAddressesForRegion(region)
  1079. if err != nil {
  1080. if aerr, ok := err.(awserr.Error); ok {
  1081. switch aerr.Code() {
  1082. default:
  1083. errorCh <- aerr
  1084. }
  1085. return
  1086. } else {
  1087. errorCh <- err
  1088. return
  1089. }
  1090. }
  1091. addressCh <- resp
  1092. }(r)
  1093. }
  1094. // Close the result channels after everything has been sent
  1095. go func() {
  1096. defer errors.HandlePanic()
  1097. wg.Wait()
  1098. close(errorCh)
  1099. close(addressCh)
  1100. }()
  1101. addresses := []*ec2.Address{}
  1102. for adds := range addressCh {
  1103. addresses = append(addresses, adds.Addresses...)
  1104. }
  1105. errors := []error{}
  1106. for err := range errorCh {
  1107. log.Printf("[Warning]: unable to get addresses: %s", err)
  1108. errors = append(errors, err)
  1109. }
  1110. // Return error if no addresses are returned
  1111. if len(errors) > 0 && len(addresses) == 0 {
  1112. return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errors), errors)
  1113. }
  1114. // Format the response this way to match the JSON-encoded formatting of a single response
  1115. // from DescribeAddresss, so that consumers can always expect AWS disk responses to have
  1116. // a "Addresss" key at the top level.
  1117. return json.Marshal(map[string][]*ec2.Address{
  1118. "Addresses": addresses,
  1119. })
  1120. }
  1121. func (a *AWS) getDisksForRegion(region string, maxResults int64, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
  1122. sess, err := session.NewSession(&aws.Config{
  1123. Region: aws.String(region),
  1124. Credentials: credentials.NewEnvCredentials(),
  1125. })
  1126. if err != nil {
  1127. return nil, err
  1128. }
  1129. ec2Svc := ec2.New(sess)
  1130. return ec2Svc.DescribeVolumes(&ec2.DescribeVolumesInput{
  1131. MaxResults: &maxResults,
  1132. NextToken: nextToken,
  1133. })
  1134. }
  1135. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  1136. func (a *AWS) GetDisks() ([]byte, error) {
  1137. if err := a.SetKeyEnv(); err != nil {
  1138. return nil, err
  1139. }
  1140. volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
  1141. errorCh := make(chan error, len(awsRegions))
  1142. var wg sync.WaitGroup
  1143. wg.Add(len(awsRegions))
  1144. // Get volumes from each AWS region
  1145. for _, r := range awsRegions {
  1146. // Fetch volume response and send results and errors to their
  1147. // respective channels
  1148. go func(region string) {
  1149. defer wg.Done()
  1150. defer errors.HandlePanic()
  1151. // Query for first page of volume results
  1152. resp, err := a.getDisksForRegion(region, 1000, nil)
  1153. if err != nil {
  1154. if aerr, ok := err.(awserr.Error); ok {
  1155. switch aerr.Code() {
  1156. default:
  1157. errorCh <- aerr
  1158. }
  1159. return
  1160. } else {
  1161. errorCh <- err
  1162. return
  1163. }
  1164. }
  1165. volumeCh <- resp
  1166. // A NextToken indicates more pages of results. Keep querying
  1167. // until all pages are retrieved.
  1168. for resp.NextToken != nil {
  1169. resp, err = a.getDisksForRegion(region, 100, resp.NextToken)
  1170. if err != nil {
  1171. if aerr, ok := err.(awserr.Error); ok {
  1172. switch aerr.Code() {
  1173. default:
  1174. errorCh <- aerr
  1175. }
  1176. return
  1177. } else {
  1178. errorCh <- err
  1179. return
  1180. }
  1181. }
  1182. volumeCh <- resp
  1183. }
  1184. }(r)
  1185. }
  1186. // Close the result channels after everything has been sent
  1187. go func() {
  1188. defer errors.HandlePanic()
  1189. wg.Wait()
  1190. close(errorCh)
  1191. close(volumeCh)
  1192. }()
  1193. volumes := []*ec2.Volume{}
  1194. for vols := range volumeCh {
  1195. volumes = append(volumes, vols.Volumes...)
  1196. }
  1197. errors := []error{}
  1198. for err := range errorCh {
  1199. log.Printf("[Warning]: unable to get disks: %s", err)
  1200. errors = append(errors, err)
  1201. }
  1202. // Return error if no volumes are returned
  1203. if len(errors) > 0 && len(volumes) == 0 {
  1204. return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errors), errors)
  1205. }
  1206. // Format the response this way to match the JSON-encoded formatting of a single response
  1207. // from DescribeVolumes, so that consumers can always expect AWS disk responses to have
  1208. // a "Volumes" key at the top level.
  1209. return json.Marshal(map[string][]*ec2.Volume{
  1210. "Volumes": volumes,
  1211. })
  1212. }
  1213. // ConvertToGlueColumnFormat takes a string and runs through various regex
  1214. // and string replacement statements to convert it to a format compatible
  1215. // with AWS Glue and Athena column names.
  1216. // Following guidance from AWS provided here ('Column Names' section):
  1217. // https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/run-athena-sql.html
  1218. // It returns a string containing the column name in proper column name format and length.
  1219. func ConvertToGlueColumnFormat(column_name string) string {
  1220. klog.V(5).Infof("Converting string \"%s\" to proper AWS Glue column name.", column_name)
  1221. // An underscore is added in front of uppercase letters
  1222. capital_underscore := regexp.MustCompile(`[A-Z]`)
  1223. final := capital_underscore.ReplaceAllString(column_name, `_$0`)
  1224. // Any non-alphanumeric characters are replaced with an underscore
  1225. no_space_punc := regexp.MustCompile(`[\s]{1,}|[^A-Za-z0-9]`)
  1226. final = no_space_punc.ReplaceAllString(final, "_")
  1227. // Duplicate underscores are removed
  1228. no_dup_underscore := regexp.MustCompile(`_{2,}`)
  1229. final = no_dup_underscore.ReplaceAllString(final, "_")
  1230. // Any leading and trailing underscores are removed
  1231. no_front_end_underscore := regexp.MustCompile(`(^\_|\_$)`)
  1232. final = no_front_end_underscore.ReplaceAllString(final, "")
  1233. // Uppercase to lowercase
  1234. final = strings.ToLower(final)
  1235. // Longer column name than expected - remove _ left to right
  1236. allowed_col_len := 128
  1237. undersc_to_remove := len(final) - allowed_col_len
  1238. if undersc_to_remove > 0 {
  1239. final = strings.Replace(final, "_", "", undersc_to_remove)
  1240. }
  1241. // If removing all of the underscores still didn't
  1242. // make the column name < 128 characters, trim it!
  1243. if len(final) > allowed_col_len {
  1244. final = final[:allowed_col_len]
  1245. }
  1246. klog.V(5).Infof("Column name being returned: \"%s\". Length: \"%d\".", final, len(final))
  1247. return final
  1248. }
  1249. func generateAWSGroupBy(lastIdx int) string {
  1250. sequence := []string{}
  1251. for i := 1; i < lastIdx+1; i++ {
  1252. sequence = append(sequence, strconv.Itoa(i))
  1253. }
  1254. return strings.Join(sequence, ",")
  1255. }
  1256. func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
  1257. customPricing, err := a.GetConfig()
  1258. if err != nil {
  1259. return nil, err
  1260. }
  1261. if customPricing.ServiceKeyName != "" {
  1262. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1263. if err != nil {
  1264. return nil, err
  1265. }
  1266. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1267. if err != nil {
  1268. return nil, err
  1269. }
  1270. }
  1271. region := aws.String(customPricing.AthenaRegion)
  1272. resultsBucket := customPricing.AthenaBucketName
  1273. database := customPricing.AthenaDatabase
  1274. c := &aws.Config{
  1275. Region: region,
  1276. }
  1277. s := session.Must(session.NewSession(c))
  1278. svc := athena.New(s)
  1279. var e athena.StartQueryExecutionInput
  1280. var r athena.ResultConfiguration
  1281. r.SetOutputLocation(resultsBucket)
  1282. e.SetResultConfiguration(&r)
  1283. e.SetQueryString(query)
  1284. var q athena.QueryExecutionContext
  1285. q.SetDatabase(database)
  1286. e.SetQueryExecutionContext(&q)
  1287. res, err := svc.StartQueryExecution(&e)
  1288. if err != nil {
  1289. return nil, err
  1290. }
  1291. klog.V(2).Infof("StartQueryExecution result:")
  1292. klog.V(2).Infof(res.GoString())
  1293. var qri athena.GetQueryExecutionInput
  1294. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1295. var qrop *athena.GetQueryExecutionOutput
  1296. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1297. for {
  1298. qrop, err = svc.GetQueryExecution(&qri)
  1299. if err != nil {
  1300. return nil, err
  1301. }
  1302. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1303. break
  1304. }
  1305. time.Sleep(duration)
  1306. }
  1307. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1308. var ip athena.GetQueryResultsInput
  1309. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1310. return svc.GetQueryResults(&ip)
  1311. } else {
  1312. return nil, fmt.Errorf("No results available for %s", query)
  1313. }
  1314. }
  1315. type RIData struct {
  1316. ResourceID string
  1317. EffectiveCost float64
  1318. ReservationARN string
  1319. MostRecentDate string
  1320. }
  1321. func (a *AWS) GetReservationDataFromAthena() error {
  1322. cfg, err := a.GetConfig()
  1323. if err != nil {
  1324. return err
  1325. }
  1326. if cfg.AthenaBucketName == "" {
  1327. return fmt.Errorf("No Athena Bucket configured")
  1328. }
  1329. if a.RIPricingByInstanceID == nil {
  1330. a.RIPricingByInstanceID = make(map[string]*RIData)
  1331. }
  1332. tNow := time.Now()
  1333. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1334. start := tOneDayAgo.Format("2006-01-02")
  1335. end := tNow.Format("2006-01-02")
  1336. q := `SELECT
  1337. line_item_usage_start_date,
  1338. reservation_reservation_a_r_n,
  1339. line_item_resource_id,
  1340. reservation_effective_cost
  1341. FROM %s as cost_data
  1342. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1343. AND reservation_reservation_a_r_n <> '' ORDER BY
  1344. line_item_usage_start_date DESC`
  1345. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1346. op, err := a.QueryAthenaBillingData(query)
  1347. if err != nil {
  1348. return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
  1349. }
  1350. klog.Infof("Fetching RI data...")
  1351. if len(op.ResultSet.Rows) > 1 {
  1352. a.RIDataLock.Lock()
  1353. mostRecentDate := ""
  1354. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1355. d := *r.Data[0].VarCharValue
  1356. if mostRecentDate == "" {
  1357. mostRecentDate = d
  1358. } else if mostRecentDate != d { // Get all most recent assignments
  1359. break
  1360. }
  1361. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1362. if err != nil {
  1363. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1364. }
  1365. r := &RIData{
  1366. ResourceID: *r.Data[2].VarCharValue,
  1367. EffectiveCost: cost,
  1368. ReservationARN: *r.Data[1].VarCharValue,
  1369. MostRecentDate: d,
  1370. }
  1371. a.RIPricingByInstanceID[r.ResourceID] = r
  1372. }
  1373. klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
  1374. for k, r := range a.RIPricingByInstanceID {
  1375. klog.V(1).Infof("Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1376. }
  1377. a.RIDataLock.Unlock()
  1378. } else {
  1379. klog.Infof("No reserved instance data found")
  1380. }
  1381. return nil
  1382. }
  1383. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  1384. // "start" and "end" are dates of the format YYYY-MM-DD
  1385. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  1386. func (a *AWS) ExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string, crossCluster bool) ([]*OutOfClusterAllocation, error) {
  1387. customPricing, err := a.GetConfig()
  1388. if err != nil {
  1389. return nil, err
  1390. }
  1391. formattedAggregators := []string{}
  1392. for _, agg := range aggregators {
  1393. aggregator_column_name := "resource_tags_user_" + agg
  1394. aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
  1395. formattedAggregators = append(formattedAggregators, aggregator_column_name)
  1396. }
  1397. aggregatorNames := strings.Join(formattedAggregators, ",")
  1398. aggregatorOr := strings.Join(formattedAggregators, " <> '' OR ")
  1399. aggregatorOr = aggregatorOr + " <> ''"
  1400. filter_column_name := "resource_tags_user_" + filterType
  1401. filter_column_name = ConvertToGlueColumnFormat(filter_column_name)
  1402. var query string
  1403. var lastIdx int
  1404. if filterType != "kubernetes_" { // This gets appended upstream and is equivalent to no filter.
  1405. lastIdx = len(formattedAggregators) + 3
  1406. groupby := generateAWSGroupBy(lastIdx)
  1407. query = fmt.Sprintf(`SELECT
  1408. CAST(line_item_usage_start_date AS DATE) as start_date,
  1409. %s,
  1410. line_item_product_code,
  1411. %s,
  1412. SUM(line_item_blended_cost) as blended_cost
  1413. FROM %s as cost_data
  1414. WHERE (%s='%s') AND line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
  1415. GROUP BY %s`, aggregatorNames, filter_column_name, customPricing.AthenaTable, filter_column_name, filterValue, start, end, aggregatorOr, groupby)
  1416. } else {
  1417. lastIdx = len(formattedAggregators) + 2
  1418. groupby := generateAWSGroupBy(lastIdx)
  1419. query = fmt.Sprintf(`SELECT
  1420. CAST(line_item_usage_start_date AS DATE) as start_date,
  1421. %s,
  1422. line_item_product_code,
  1423. SUM(line_item_blended_cost) as blended_cost
  1424. FROM %s as cost_data
  1425. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
  1426. GROUP BY %s`, aggregatorNames, customPricing.AthenaTable, start, end, aggregatorOr, groupby)
  1427. }
  1428. klog.V(3).Infof("Running Query: %s", query)
  1429. if customPricing.ServiceKeyName != "" {
  1430. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1431. if err != nil {
  1432. return nil, err
  1433. }
  1434. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1435. if err != nil {
  1436. return nil, err
  1437. }
  1438. }
  1439. region := aws.String(customPricing.AthenaRegion)
  1440. resultsBucket := customPricing.AthenaBucketName
  1441. database := customPricing.AthenaDatabase
  1442. c := &aws.Config{
  1443. Region: region,
  1444. }
  1445. s := session.Must(session.NewSession(c))
  1446. svc := athena.New(s)
  1447. var e athena.StartQueryExecutionInput
  1448. var r athena.ResultConfiguration
  1449. r.SetOutputLocation(resultsBucket)
  1450. e.SetResultConfiguration(&r)
  1451. e.SetQueryString(query)
  1452. var q athena.QueryExecutionContext
  1453. q.SetDatabase(database)
  1454. e.SetQueryExecutionContext(&q)
  1455. res, err := svc.StartQueryExecution(&e)
  1456. if err != nil {
  1457. return nil, err
  1458. }
  1459. klog.V(2).Infof("StartQueryExecution result:")
  1460. klog.V(2).Infof(res.GoString())
  1461. var qri athena.GetQueryExecutionInput
  1462. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1463. var qrop *athena.GetQueryExecutionOutput
  1464. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1465. for {
  1466. qrop, err = svc.GetQueryExecution(&qri)
  1467. if err != nil {
  1468. return nil, err
  1469. }
  1470. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1471. break
  1472. }
  1473. time.Sleep(duration)
  1474. }
  1475. var oocAllocs []*OutOfClusterAllocation
  1476. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1477. var ip athena.GetQueryResultsInput
  1478. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1479. op, err := svc.GetQueryResults(&ip)
  1480. if err != nil {
  1481. return nil, err
  1482. }
  1483. if len(op.ResultSet.Rows) > 1 {
  1484. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows))] {
  1485. cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
  1486. if err != nil {
  1487. return nil, err
  1488. }
  1489. environment := ""
  1490. for _, d := range r.Data[1 : len(formattedAggregators)+1] {
  1491. if *d.VarCharValue != "" {
  1492. environment = *d.VarCharValue // just set to the first nonempty match
  1493. }
  1494. break
  1495. }
  1496. ooc := &OutOfClusterAllocation{
  1497. Aggregator: strings.Join(aggregators, ","),
  1498. Environment: environment,
  1499. Service: *r.Data[len(formattedAggregators)+1].VarCharValue,
  1500. Cost: cost,
  1501. }
  1502. oocAllocs = append(oocAllocs, ooc)
  1503. }
  1504. } else {
  1505. klog.V(1).Infof("No results available for %s at database %s between %s and %s", strings.Join(formattedAggregators, ","), customPricing.AthenaTable, start, end)
  1506. }
  1507. }
  1508. if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
  1509. gcp, err := NewCrossClusterProvider("gcp", "aws.json", a.Clientset)
  1510. if err != nil {
  1511. klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
  1512. }
  1513. gcpOOC, err := gcp.ExternalAllocations(start, end, aggregators, filterType, filterValue, true)
  1514. if err != nil {
  1515. klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
  1516. }
  1517. oocAllocs = append(oocAllocs, gcpOOC...)
  1518. }
  1519. return oocAllocs, nil
  1520. }
  1521. // QuerySQL can query a properly configured Athena database.
  1522. // Used to fetch billing data.
  1523. // Requires a json config in /var/configs with key region, output, and database.
  1524. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  1525. customPricing, err := a.GetConfig()
  1526. if err != nil {
  1527. return nil, err
  1528. }
  1529. if customPricing.ServiceKeyName != "" {
  1530. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1531. if err != nil {
  1532. return nil, err
  1533. }
  1534. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1535. if err != nil {
  1536. return nil, err
  1537. }
  1538. }
  1539. athenaConfigs, err := os.Open("/var/configs/athena.json")
  1540. if err != nil {
  1541. return nil, err
  1542. }
  1543. defer athenaConfigs.Close()
  1544. b, err := ioutil.ReadAll(athenaConfigs)
  1545. if err != nil {
  1546. return nil, err
  1547. }
  1548. var athenaConf map[string]string
  1549. json.Unmarshal([]byte(b), &athenaConf)
  1550. region := aws.String(customPricing.AthenaRegion)
  1551. resultsBucket := customPricing.AthenaBucketName
  1552. database := customPricing.AthenaDatabase
  1553. c := &aws.Config{
  1554. Region: region,
  1555. }
  1556. s := session.Must(session.NewSession(c))
  1557. svc := athena.New(s)
  1558. var e athena.StartQueryExecutionInput
  1559. var r athena.ResultConfiguration
  1560. r.SetOutputLocation(resultsBucket)
  1561. e.SetResultConfiguration(&r)
  1562. e.SetQueryString(query)
  1563. var q athena.QueryExecutionContext
  1564. q.SetDatabase(database)
  1565. e.SetQueryExecutionContext(&q)
  1566. res, err := svc.StartQueryExecution(&e)
  1567. if err != nil {
  1568. return nil, err
  1569. }
  1570. klog.V(2).Infof("StartQueryExecution result:")
  1571. klog.V(2).Infof(res.GoString())
  1572. var qri athena.GetQueryExecutionInput
  1573. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1574. var qrop *athena.GetQueryExecutionOutput
  1575. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1576. for {
  1577. qrop, err = svc.GetQueryExecution(&qri)
  1578. if err != nil {
  1579. return nil, err
  1580. }
  1581. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1582. break
  1583. }
  1584. time.Sleep(duration)
  1585. }
  1586. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1587. var ip athena.GetQueryResultsInput
  1588. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1589. op, err := svc.GetQueryResults(&ip)
  1590. if err != nil {
  1591. return nil, err
  1592. }
  1593. b, err := json.Marshal(op.ResultSet)
  1594. if err != nil {
  1595. return nil, err
  1596. }
  1597. return b, nil
  1598. }
  1599. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  1600. }
  1601. type spotInfo struct {
  1602. Timestamp string `csv:"Timestamp"`
  1603. UsageType string `csv:"UsageType"`
  1604. Operation string `csv:"Operation"`
  1605. InstanceID string `csv:"InstanceID"`
  1606. MyBidID string `csv:"MyBidID"`
  1607. MyMaxPrice string `csv:"MyMaxPrice"`
  1608. MarketPrice string `csv:"MarketPrice"`
  1609. Charge string `csv:"Charge"`
  1610. Version string `csv:"Version"`
  1611. }
  1612. type fnames []*string
  1613. func (f fnames) Len() int {
  1614. return len(f)
  1615. }
  1616. func (f fnames) Swap(i, j int) {
  1617. f[i], f[j] = f[j], f[i]
  1618. }
  1619. func (f fnames) Less(i, j int) bool {
  1620. key1 := strings.Split(*f[i], ".")
  1621. key2 := strings.Split(*f[j], ".")
  1622. t1, err := time.Parse("2006-01-02-15", key1[1])
  1623. if err != nil {
  1624. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  1625. return false
  1626. }
  1627. t2, err := time.Parse("2006-01-02-15", key2[1])
  1628. if err != nil {
  1629. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  1630. return false
  1631. }
  1632. return t1.Before(t2)
  1633. }
  1634. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  1635. // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1636. if accessKeyID != "" && accessKeySecret != "" {
  1637. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  1638. if err != nil {
  1639. return nil, err
  1640. }
  1641. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  1642. if err != nil {
  1643. return nil, err
  1644. }
  1645. }
  1646. s3Prefix := projectID
  1647. if len(prefix) != 0 {
  1648. s3Prefix = prefix + "/" + s3Prefix
  1649. }
  1650. c := aws.NewConfig().WithRegion(region)
  1651. s := session.Must(session.NewSession(c))
  1652. s3Svc := s3.New(s)
  1653. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  1654. tNow := time.Now()
  1655. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1656. ls := &s3.ListObjectsInput{
  1657. Bucket: aws.String(bucket),
  1658. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1659. }
  1660. ls2 := &s3.ListObjectsInput{
  1661. Bucket: aws.String(bucket),
  1662. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1663. }
  1664. lso, err := s3Svc.ListObjects(ls)
  1665. if err != nil {
  1666. return nil, err
  1667. }
  1668. lsoLen := len(lso.Contents)
  1669. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1670. if lsoLen == 0 {
  1671. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1672. }
  1673. lso2, err := s3Svc.ListObjects(ls2)
  1674. if err != nil {
  1675. return nil, err
  1676. }
  1677. lso2Len := len(lso2.Contents)
  1678. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1679. if lso2Len == 0 {
  1680. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1681. }
  1682. var keys []*string
  1683. for _, obj := range lso.Contents {
  1684. keys = append(keys, obj.Key)
  1685. }
  1686. for _, obj := range lso2.Contents {
  1687. keys = append(keys, obj.Key)
  1688. }
  1689. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1690. header, err := csvutil.Header(spotInfo{}, "csv")
  1691. if err != nil {
  1692. return nil, err
  1693. }
  1694. fieldsPerRecord := len(header)
  1695. spots := make(map[string]*spotInfo)
  1696. for _, key := range keys {
  1697. getObj := &s3.GetObjectInput{
  1698. Bucket: aws.String(bucket),
  1699. Key: key,
  1700. }
  1701. buf := aws.NewWriteAtBuffer([]byte{})
  1702. _, err := downloader.Download(buf, getObj)
  1703. if err != nil {
  1704. return nil, err
  1705. }
  1706. r := bytes.NewReader(buf.Bytes())
  1707. gr, err := gzip.NewReader(r)
  1708. if err != nil {
  1709. return nil, err
  1710. }
  1711. csvReader := csv.NewReader(gr)
  1712. csvReader.Comma = '\t'
  1713. csvReader.FieldsPerRecord = fieldsPerRecord
  1714. dec, err := csvutil.NewDecoder(csvReader, header...)
  1715. if err != nil {
  1716. return nil, err
  1717. }
  1718. var foundVersion string
  1719. for {
  1720. spot := spotInfo{}
  1721. err := dec.Decode(&spot)
  1722. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1723. if err == io.EOF {
  1724. break
  1725. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1726. rec := dec.Record()
  1727. // the first two "Record()" will be the comment lines
  1728. // and they show up as len() == 1
  1729. // the first of which is "#Version"
  1730. // the second of which is "#Fields: "
  1731. if len(rec) != 1 {
  1732. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1733. continue
  1734. }
  1735. if len(foundVersion) == 0 {
  1736. spotFeedVersion := rec[0]
  1737. klog.V(4).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  1738. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1739. if matches != nil {
  1740. foundVersion = matches[1]
  1741. if foundVersion != supportedSpotFeedVersion {
  1742. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1743. break
  1744. }
  1745. }
  1746. continue
  1747. } else if strings.Index(rec[0], "#") == 0 {
  1748. continue
  1749. } else {
  1750. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  1751. continue
  1752. }
  1753. } else if err != nil {
  1754. klog.V(2).Infof("Error during spot info decode: %+v", err)
  1755. continue
  1756. }
  1757. klog.V(4).Infof("Found spot info %+v", spot)
  1758. spots[spot.InstanceID] = &spot
  1759. }
  1760. gr.Close()
  1761. }
  1762. return spots, nil
  1763. }
  1764. func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  1765. /*
  1766. numReserved := len(a.ReservedInstances)
  1767. // Early return if no reserved instance data loaded
  1768. if numReserved == 0 {
  1769. klog.V(4).Infof("[Reserved] No Reserved Instances")
  1770. return
  1771. }
  1772. cfg, err := a.GetConfig()
  1773. defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
  1774. if err != nil {
  1775. klog.V(3).Infof("Could not parse default cpu price")
  1776. defaultCPU = 0.031611
  1777. }
  1778. defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
  1779. if err != nil {
  1780. klog.V(3).Infof("Could not parse default ram price")
  1781. defaultRAM = 0.004237
  1782. }
  1783. cpuToRAMRatio := defaultCPU / defaultRAM
  1784. now := time.Now()
  1785. instances := make(map[string][]*AWSReservedInstance)
  1786. for _, r := range a.ReservedInstances {
  1787. if now.Before(r.StartDate) || now.After(r.EndDate) {
  1788. klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
  1789. continue
  1790. }
  1791. _, ok := instances[r.Region]
  1792. if !ok {
  1793. instances[r.Region] = []*AWSReservedInstance{r}
  1794. } else {
  1795. instances[r.Region] = append(instances[r.Region], r)
  1796. }
  1797. }
  1798. awsNodes := make(map[string]*v1.Node)
  1799. currentNodes := a.Clientset.GetAllNodes()
  1800. // Create a node name -> node map
  1801. for _, awsNode := range currentNodes {
  1802. awsNodes[awsNode.GetName()] = awsNode
  1803. }
  1804. // go through all provider nodes using k8s nodes for region
  1805. for nodeName, node := range nodes {
  1806. // Reset reserved allocation to prevent double allocation
  1807. node.Reserved = nil
  1808. kNode, ok := awsNodes[nodeName]
  1809. if !ok {
  1810. klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
  1811. continue
  1812. }
  1813. nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
  1814. if !ok {
  1815. klog.V(1).Infof("[Reserved] Could not find node region")
  1816. continue
  1817. }
  1818. reservedInstances, ok := instances[nodeRegion]
  1819. if !ok {
  1820. klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
  1821. continue
  1822. }
  1823. // Determine the InstanceType of the node
  1824. instanceType, ok := kNode.Labels["beta.kubernetes.io/instance-type"]
  1825. if !ok {
  1826. continue
  1827. }
  1828. ramBytes, err := strconv.ParseFloat(node.RAMBytes, 64)
  1829. if err != nil {
  1830. continue
  1831. }
  1832. ramGB := ramBytes / 1024 / 1024 / 1024
  1833. cpu, err := strconv.ParseFloat(node.VCPU, 64)
  1834. if err != nil {
  1835. continue
  1836. }
  1837. ramMultiple := cpu*cpuToRAMRatio + ramGB
  1838. node.Reserved = &ReservedInstanceData{
  1839. ReservedCPU: 0,
  1840. ReservedRAM: 0,
  1841. }
  1842. for i, reservedInstance := range reservedInstances {
  1843. if reservedInstance.InstanceType == instanceType {
  1844. // Use < 0 to mark as ALL
  1845. node.Reserved.ReservedCPU = -1
  1846. node.Reserved.ReservedRAM = -1
  1847. // Set Costs based on CPU/RAM ratios
  1848. ramPrice := reservedInstance.PricePerHour / ramMultiple
  1849. node.Reserved.CPUCost = ramPrice * cpuToRAMRatio
  1850. node.Reserved.RAMCost = ramPrice
  1851. // Remove the reserve from the temporary slice to prevent
  1852. // being reallocated
  1853. instances[nodeRegion] = append(reservedInstances[:i], reservedInstances[i+1:]...)
  1854. break
  1855. }
  1856. }
  1857. }*/
  1858. }
  1859. type AWSReservedInstance struct {
  1860. Zone string
  1861. Region string
  1862. InstanceType string
  1863. InstanceCount int64
  1864. InstanceTenacy string
  1865. StartDate time.Time
  1866. EndDate time.Time
  1867. PricePerHour float64
  1868. }
  1869. func (ari *AWSReservedInstance) String() string {
  1870. return fmt.Sprintf("[Zone: %s, Region: %s, Type: %s, Count: %d, Tenacy: %s, Start: %+v, End: %+v, Price: %f]", ari.Zone, ari.Region, ari.InstanceType, ari.InstanceCount, ari.InstanceTenacy, ari.StartDate, ari.EndDate, ari.PricePerHour)
  1871. }
  1872. func isReservedInstanceHourlyPrice(rc *ec2.RecurringCharge) bool {
  1873. return rc != nil && rc.Frequency != nil && *rc.Frequency == "Hourly"
  1874. }
  1875. func getReservedInstancePrice(ri *ec2.ReservedInstances) (float64, error) {
  1876. var pricePerHour float64
  1877. if len(ri.RecurringCharges) > 0 {
  1878. for _, rc := range ri.RecurringCharges {
  1879. if isReservedInstanceHourlyPrice(rc) {
  1880. pricePerHour = *rc.Amount
  1881. break
  1882. }
  1883. }
  1884. }
  1885. // If we're still unable to resolve hourly price, try fixed -> hourly
  1886. if pricePerHour == 0 {
  1887. if ri.Duration != nil && ri.FixedPrice != nil {
  1888. var durHours float64
  1889. durSeconds := float64(*ri.Duration)
  1890. fixedPrice := float64(*ri.FixedPrice)
  1891. if durSeconds != 0 && fixedPrice != 0 {
  1892. durHours = durSeconds / 60 / 60
  1893. pricePerHour = fixedPrice / durHours
  1894. }
  1895. }
  1896. }
  1897. if pricePerHour == 0 {
  1898. return 0, fmt.Errorf("Failed to resolve an hourly price from FixedPrice or Recurring Costs")
  1899. }
  1900. return pricePerHour, nil
  1901. }
  1902. func getRegionReservedInstances(region string) ([]*AWSReservedInstance, error) {
  1903. c := &aws.Config{
  1904. Region: aws.String(region),
  1905. }
  1906. s := session.Must(session.NewSession(c))
  1907. svc := ec2.New(s)
  1908. response, err := svc.DescribeReservedInstances(&ec2.DescribeReservedInstancesInput{})
  1909. if err != nil {
  1910. return nil, err
  1911. }
  1912. var reservedInstances []*AWSReservedInstance
  1913. for _, ri := range response.ReservedInstances {
  1914. var zone string
  1915. if ri.AvailabilityZone != nil {
  1916. zone = *ri.AvailabilityZone
  1917. }
  1918. pricePerHour, err := getReservedInstancePrice(ri)
  1919. if err != nil {
  1920. klog.V(1).Infof("Error Resolving Price: %s", err.Error())
  1921. continue
  1922. }
  1923. reservedInstances = append(reservedInstances, &AWSReservedInstance{
  1924. Zone: zone,
  1925. Region: region,
  1926. InstanceType: *ri.InstanceType,
  1927. InstanceCount: *ri.InstanceCount,
  1928. InstanceTenacy: *ri.InstanceTenancy,
  1929. StartDate: *ri.Start,
  1930. EndDate: *ri.End,
  1931. PricePerHour: pricePerHour,
  1932. })
  1933. }
  1934. return reservedInstances, nil
  1935. }
  1936. func (a *AWS) getReservedInstances() ([]*AWSReservedInstance, error) {
  1937. err := a.configureAWSAuth()
  1938. if err != nil {
  1939. return nil, fmt.Errorf("Error Configuring aws auth: %s", err.Error())
  1940. }
  1941. var reservedInstances []*AWSReservedInstance
  1942. nodes := a.Clientset.GetAllNodes()
  1943. regionsSeen := make(map[string]bool)
  1944. for _, node := range nodes {
  1945. region, ok := node.Labels[v1.LabelZoneRegion]
  1946. if !ok {
  1947. continue
  1948. }
  1949. if regionsSeen[region] {
  1950. continue
  1951. }
  1952. ris, err := getRegionReservedInstances(region)
  1953. if err != nil {
  1954. klog.V(3).Infof("Error getting reserved instances: %s", err.Error())
  1955. continue
  1956. }
  1957. regionsSeen[region] = true
  1958. reservedInstances = append(reservedInstances, ris...)
  1959. }
  1960. return reservedInstances, nil
  1961. }