awsprovider.go 72 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "os"
  12. "regexp"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "k8s.io/klog"
  18. "github.com/kubecost/cost-model/pkg/clustercache"
  19. "github.com/kubecost/cost-model/pkg/env"
  20. "github.com/kubecost/cost-model/pkg/errors"
  21. "github.com/kubecost/cost-model/pkg/log"
  22. "github.com/kubecost/cost-model/pkg/util"
  23. "github.com/aws/aws-sdk-go/aws"
  24. "github.com/aws/aws-sdk-go/aws/awserr"
  25. "github.com/aws/aws-sdk-go/aws/credentials"
  26. "github.com/aws/aws-sdk-go/aws/credentials/stscreds"
  27. "github.com/aws/aws-sdk-go/aws/session"
  28. "github.com/aws/aws-sdk-go/service/athena"
  29. "github.com/aws/aws-sdk-go/service/ec2"
  30. "github.com/aws/aws-sdk-go/service/s3"
  31. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  32. "github.com/jszwec/csvutil"
  33. v1 "k8s.io/api/core/v1"
  34. )
  35. const awsReservedInstancePricePerHour = 0.0287
  36. const supportedSpotFeedVersion = "1"
  37. const SpotInfoUpdateType = "spotinfo"
  38. const AthenaInfoUpdateType = "athenainfo"
  39. const PreemptibleType = "preemptible"
  40. // How often spot data is refreshed
  41. const SpotRefreshDuration = 15 * time.Minute
  42. const defaultConfigPath = "/var/configs/"
  43. var awsRegions = []string{
  44. "us-east-2",
  45. "us-east-1",
  46. "us-west-1",
  47. "us-west-2",
  48. "ap-east-1",
  49. "ap-south-1",
  50. "ap-northeast-3",
  51. "ap-northeast-2",
  52. "ap-southeast-1",
  53. "ap-southeast-2",
  54. "ap-northeast-1",
  55. "ca-central-1",
  56. "cn-north-1",
  57. "cn-northwest-1",
  58. "eu-central-1",
  59. "eu-west-1",
  60. "eu-west-2",
  61. "eu-west-3",
  62. "eu-north-1",
  63. "me-south-1",
  64. "sa-east-1",
  65. "us-gov-east-1",
  66. "us-gov-west-1",
  67. }
  68. // AWS represents an Amazon Provider
  69. type AWS struct {
  70. Pricing map[string]*AWSProductTerms
  71. SpotPricingByInstanceID map[string]*spotInfo
  72. SpotPricingUpdatedAt *time.Time
  73. SpotRefreshRunning bool
  74. SpotPricingLock sync.RWMutex
  75. RIPricingByInstanceID map[string]*RIData
  76. RIDataRunning bool
  77. RIDataLock sync.RWMutex
  78. SavingsPlanDataByInstanceID map[string]*SavingsPlanData
  79. SavingsPlanDataRunning bool
  80. SavingsPlanDataLock sync.RWMutex
  81. ValidPricingKeys map[string]bool
  82. Clientset clustercache.ClusterCache
  83. BaseCPUPrice string
  84. BaseRAMPrice string
  85. BaseGPUPrice string
  86. BaseSpotCPUPrice string
  87. BaseSpotRAMPrice string
  88. SpotLabelName string
  89. SpotLabelValue string
  90. ServiceKeyName string
  91. ServiceKeySecret string
  92. SpotDataRegion string
  93. SpotDataBucket string
  94. SpotDataPrefix string
  95. ProjectID string
  96. DownloadPricingDataLock sync.RWMutex
  97. Config *ProviderConfig
  98. ServiceAccountChecks map[string]*ServiceAccountCheck
  99. clusterManagementPrice float64
  100. clusterProvisioner string
  101. *CustomProvider
  102. }
  103. type AWSAccessKey struct {
  104. AccessKeyID string `json:"aws_access_key_id"`
  105. SecretAccessKey string `json:"aws_secret_access_key"`
  106. }
  107. // AWSPricing maps a k8s node to an AWS Pricing "product"
  108. type AWSPricing struct {
  109. Products map[string]*AWSProduct `json:"products"`
  110. Terms AWSPricingTerms `json:"terms"`
  111. }
  112. // AWSProduct represents a purchased SKU
  113. type AWSProduct struct {
  114. Sku string `json:"sku"`
  115. Attributes AWSProductAttributes `json:"attributes"`
  116. }
  117. // AWSProductAttributes represents metadata about the product used to map to a node.
  118. type AWSProductAttributes struct {
  119. Location string `json:"location"`
  120. InstanceType string `json:"instanceType"`
  121. Memory string `json:"memory"`
  122. Storage string `json:"storage"`
  123. VCpu string `json:"vcpu"`
  124. UsageType string `json:"usagetype"`
  125. OperatingSystem string `json:"operatingSystem"`
  126. PreInstalledSw string `json:"preInstalledSw"`
  127. InstanceFamily string `json:"instanceFamily"`
  128. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  129. }
  130. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  131. type AWSPricingTerms struct {
  132. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  133. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  134. }
  135. // AWSOfferTerm is a sku extension used to pay for the node.
  136. type AWSOfferTerm struct {
  137. Sku string `json:"sku"`
  138. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  139. }
  140. // AWSRateCode encodes data about the price of a product
  141. type AWSRateCode struct {
  142. Unit string `json:"unit"`
  143. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  144. }
  145. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  146. type AWSCurrencyCode struct {
  147. USD string `json:"USD"`
  148. }
  149. // AWSProductTerms represents the full terms of the product
  150. type AWSProductTerms struct {
  151. Sku string `json:"sku"`
  152. OnDemand *AWSOfferTerm `json:"OnDemand"`
  153. Reserved *AWSOfferTerm `json:"Reserved"`
  154. Memory string `json:"memory"`
  155. Storage string `json:"storage"`
  156. VCpu string `json:"vcpu"`
  157. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  158. PV *PV `json:"pv"`
  159. }
  160. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  161. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  162. // OnDemandRateCode is appended to an node sku
  163. const OnDemandRateCode = ".JRTCKXETXF"
  164. // ReservedRateCode is appended to a node sku
  165. const ReservedRateCode = ".38NPMPTW36"
  166. // HourlyRateCode is appended to a node sku
  167. const HourlyRateCode = ".6YS6EN2CT7"
  168. // volTypes are used to map between AWS UsageTypes and
  169. // EBS volume types, as they would appear in K8s storage class
  170. // name and the EC2 API.
  171. var volTypes = map[string]string{
  172. "EBS:VolumeUsage.gp2": "gp2",
  173. "EBS:VolumeUsage": "standard",
  174. "EBS:VolumeUsage.sc1": "sc1",
  175. "EBS:VolumeP-IOPS.piops": "io1",
  176. "EBS:VolumeUsage.st1": "st1",
  177. "EBS:VolumeUsage.piops": "io1",
  178. "gp2": "EBS:VolumeUsage.gp2",
  179. "standard": "EBS:VolumeUsage",
  180. "sc1": "EBS:VolumeUsage.sc1",
  181. "io1": "EBS:VolumeUsage.piops",
  182. "st1": "EBS:VolumeUsage.st1",
  183. }
  184. // locationToRegion maps AWS region names (As they come from Billing)
  185. // to actual region identifiers
  186. var locationToRegion = map[string]string{
  187. "US East (Ohio)": "us-east-2",
  188. "US East (N. Virginia)": "us-east-1",
  189. "US West (N. California)": "us-west-1",
  190. "US West (Oregon)": "us-west-2",
  191. "Asia Pacific (Hong Kong)": "ap-east-1",
  192. "Asia Pacific (Mumbai)": "ap-south-1",
  193. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  194. "Asia Pacific (Seoul)": "ap-northeast-2",
  195. "Asia Pacific (Singapore)": "ap-southeast-1",
  196. "Asia Pacific (Sydney)": "ap-southeast-2",
  197. "Asia Pacific (Tokyo)": "ap-northeast-1",
  198. "Canada (Central)": "ca-central-1",
  199. "China (Beijing)": "cn-north-1",
  200. "China (Ningxia)": "cn-northwest-1",
  201. "EU (Frankfurt)": "eu-central-1",
  202. "EU (Ireland)": "eu-west-1",
  203. "EU (London)": "eu-west-2",
  204. "EU (Paris)": "eu-west-3",
  205. "EU (Stockholm)": "eu-north-1",
  206. "South America (Sao Paulo)": "sa-east-1",
  207. "AWS GovCloud (US-East)": "us-gov-east-1",
  208. "AWS GovCloud (US)": "us-gov-west-1",
  209. }
  210. var regionToBillingRegionCode = map[string]string{
  211. "us-east-2": "USE2",
  212. "us-east-1": "",
  213. "us-west-1": "USW1",
  214. "us-west-2": "USW2",
  215. "ap-east-1": "APE1",
  216. "ap-south-1": "APS3",
  217. "ap-northeast-3": "APN3",
  218. "ap-northeast-2": "APN2",
  219. "ap-southeast-1": "APS1",
  220. "ap-southeast-2": "APS2",
  221. "ap-northeast-1": "APN1",
  222. "ca-central-1": "CAN1",
  223. "cn-north-1": "",
  224. "cn-northwest-1": "",
  225. "eu-central-1": "EUC1",
  226. "eu-west-1": "EU",
  227. "eu-west-2": "EUW2",
  228. "eu-west-3": "EUW3",
  229. "eu-north-1": "EUN1",
  230. "sa-east-1": "SAE1",
  231. "us-gov-east-1": "UGE1",
  232. "us-gov-west-1": "UGW1",
  233. }
  234. var loadedAWSSecret bool = false
  235. var awsSecret *AWSAccessKey = nil
  236. func (aws *AWS) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
  237. return ""
  238. }
  239. // KubeAttrConversion maps the k8s labels for region to an aws region
  240. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  241. operatingSystem = strings.ToLower(operatingSystem)
  242. region := locationToRegion[location]
  243. return region + "," + instanceType + "," + operatingSystem
  244. }
  245. type AwsSpotFeedInfo struct {
  246. BucketName string `json:"bucketName"`
  247. Prefix string `json:"prefix"`
  248. Region string `json:"region"`
  249. AccountID string `json:"projectID"`
  250. ServiceKeyName string `json:"serviceKeyName"`
  251. ServiceKeySecret string `json:"serviceKeySecret"`
  252. SpotLabel string `json:"spotLabel"`
  253. SpotLabelValue string `json:"spotLabelValue"`
  254. }
  255. type AwsAthenaInfo struct {
  256. AthenaBucketName string `json:"athenaBucketName"`
  257. AthenaRegion string `json:"athenaRegion"`
  258. AthenaDatabase string `json:"athenaDatabase"`
  259. AthenaTable string `json:"athenaTable"`
  260. ServiceKeyName string `json:"serviceKeyName"`
  261. ServiceKeySecret string `json:"serviceKeySecret"`
  262. AccountID string `json:"projectID"`
  263. MasterPayerARN string `json:"masterPayerARN"`
  264. }
  265. func (aws *AWS) GetManagementPlatform() (string, error) {
  266. nodes := aws.Clientset.GetAllNodes()
  267. if len(nodes) > 0 {
  268. n := nodes[0]
  269. version := n.Status.NodeInfo.KubeletVersion
  270. if strings.Contains(version, "eks") {
  271. return "eks", nil
  272. }
  273. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  274. return "kops", nil
  275. }
  276. }
  277. return "", nil
  278. }
  279. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  280. c, err := aws.Config.GetCustomPricingData()
  281. if c.Discount == "" {
  282. c.Discount = "0%"
  283. }
  284. if c.NegotiatedDiscount == "" {
  285. c.NegotiatedDiscount = "0%"
  286. }
  287. if err != nil {
  288. return nil, err
  289. }
  290. return c, nil
  291. }
  292. func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
  293. return aws.Config.UpdateFromMap(a)
  294. }
  295. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  296. return aws.Config.Update(func(c *CustomPricing) error {
  297. if updateType == SpotInfoUpdateType {
  298. a := AwsSpotFeedInfo{}
  299. err := json.NewDecoder(r).Decode(&a)
  300. if err != nil {
  301. return err
  302. }
  303. c.ServiceKeyName = a.ServiceKeyName
  304. if a.ServiceKeySecret != "" {
  305. c.ServiceKeySecret = a.ServiceKeySecret
  306. }
  307. c.SpotDataPrefix = a.Prefix
  308. c.SpotDataBucket = a.BucketName
  309. c.ProjectID = a.AccountID
  310. c.SpotDataRegion = a.Region
  311. c.SpotLabel = a.SpotLabel
  312. c.SpotLabelValue = a.SpotLabelValue
  313. } else if updateType == AthenaInfoUpdateType {
  314. a := AwsAthenaInfo{}
  315. err := json.NewDecoder(r).Decode(&a)
  316. if err != nil {
  317. return err
  318. }
  319. c.AthenaBucketName = a.AthenaBucketName
  320. c.AthenaRegion = a.AthenaRegion
  321. c.AthenaDatabase = a.AthenaDatabase
  322. c.AthenaTable = a.AthenaTable
  323. c.ServiceKeyName = a.ServiceKeyName
  324. if a.ServiceKeySecret != "" {
  325. c.ServiceKeySecret = a.ServiceKeySecret
  326. }
  327. if a.MasterPayerARN != "" {
  328. c.MasterPayerARN = a.MasterPayerARN
  329. }
  330. c.AthenaProjectID = a.AccountID
  331. } else {
  332. a := make(map[string]interface{})
  333. err := json.NewDecoder(r).Decode(&a)
  334. if err != nil {
  335. return err
  336. }
  337. for k, v := range a {
  338. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  339. vstr, ok := v.(string)
  340. if ok {
  341. err := SetCustomPricingField(c, kUpper, vstr)
  342. if err != nil {
  343. return err
  344. }
  345. } else {
  346. sci := v.(map[string]interface{})
  347. sc := make(map[string]string)
  348. for k, val := range sci {
  349. sc[k] = val.(string)
  350. }
  351. c.SharedCosts = sc //todo: support reflection/multiple map fields
  352. }
  353. }
  354. }
  355. if env.IsRemoteEnabled() {
  356. err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
  357. if err != nil {
  358. return err
  359. }
  360. }
  361. return nil
  362. })
  363. }
  364. type awsKey struct {
  365. SpotLabelName string
  366. SpotLabelValue string
  367. Labels map[string]string
  368. ProviderID string
  369. }
  370. func (k *awsKey) GPUType() string {
  371. return ""
  372. }
  373. func (k *awsKey) ID() string {
  374. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  375. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  376. if matchNum == 2 {
  377. return group
  378. }
  379. }
  380. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  381. return ""
  382. }
  383. func (k *awsKey) Features() string {
  384. instanceType := k.Labels[v1.LabelInstanceType]
  385. var operatingSystem string
  386. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  387. if !ok {
  388. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  389. }
  390. region := k.Labels[v1.LabelZoneRegion]
  391. key := region + "," + instanceType + "," + operatingSystem
  392. usageType := PreemptibleType
  393. spotKey := key + "," + usageType
  394. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  395. return spotKey
  396. }
  397. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  398. return spotKey
  399. }
  400. return key
  401. }
  402. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  403. pricing, ok := aws.Pricing[pvk.Features()]
  404. if !ok {
  405. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  406. return &PV{}, nil
  407. }
  408. return pricing.PV, nil
  409. }
  410. type awsPVKey struct {
  411. Labels map[string]string
  412. StorageClassParameters map[string]string
  413. StorageClassName string
  414. Name string
  415. DefaultRegion string
  416. ProviderID string
  417. }
  418. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  419. providerID := ""
  420. if pv.Spec.AWSElasticBlockStore != nil {
  421. providerID = pv.Spec.AWSElasticBlockStore.VolumeID
  422. }
  423. return &awsPVKey{
  424. Labels: pv.Labels,
  425. StorageClassName: pv.Spec.StorageClassName,
  426. StorageClassParameters: parameters,
  427. Name: pv.Name,
  428. DefaultRegion: defaultRegion,
  429. ProviderID: providerID,
  430. }
  431. }
  432. func (key *awsPVKey) ID() string {
  433. return key.ProviderID
  434. }
  435. func (key *awsPVKey) GetStorageClass() string {
  436. return key.StorageClassName
  437. }
  438. func (key *awsPVKey) Features() string {
  439. storageClass := key.StorageClassParameters["type"]
  440. if storageClass == "standard" {
  441. storageClass = "gp2"
  442. }
  443. // Storage class names are generally EBS volume types (gp2)
  444. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  445. // Converts between the 2
  446. region := key.Labels[v1.LabelZoneRegion]
  447. //if region == "" {
  448. // region = "us-east-1"
  449. //}
  450. class, ok := volTypes[storageClass]
  451. if !ok {
  452. klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  453. }
  454. return region + "," + class
  455. }
  456. // GetKey maps node labels to information needed to retrieve pricing data
  457. func (aws *AWS) GetKey(labels map[string]string, n *v1.Node) Key {
  458. return &awsKey{
  459. SpotLabelName: aws.SpotLabelName,
  460. SpotLabelValue: aws.SpotLabelValue,
  461. Labels: labels,
  462. ProviderID: labels["providerID"],
  463. }
  464. }
  465. func (aws *AWS) isPreemptible(key string) bool {
  466. s := strings.Split(key, ",")
  467. if len(s) == 4 && s[3] == PreemptibleType {
  468. return true
  469. }
  470. return false
  471. }
  472. func (aws *AWS) ClusterManagementPricing() (string, float64, error) {
  473. return aws.clusterProvisioner, aws.clusterManagementPrice, nil
  474. }
  475. // DownloadPricingData fetches data from the AWS Pricing API
  476. func (aws *AWS) DownloadPricingData() error {
  477. aws.DownloadPricingDataLock.Lock()
  478. defer aws.DownloadPricingDataLock.Unlock()
  479. if aws.ServiceAccountChecks == nil {
  480. aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  481. }
  482. c, err := aws.Config.GetCustomPricingData()
  483. if err != nil {
  484. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  485. }
  486. aws.BaseCPUPrice = c.CPU
  487. aws.BaseRAMPrice = c.RAM
  488. aws.BaseGPUPrice = c.GPU
  489. aws.BaseSpotCPUPrice = c.SpotCPU
  490. aws.BaseSpotRAMPrice = c.SpotRAM
  491. aws.SpotLabelName = c.SpotLabel
  492. aws.SpotLabelValue = c.SpotLabelValue
  493. aws.SpotDataBucket = c.SpotDataBucket
  494. aws.SpotDataPrefix = c.SpotDataPrefix
  495. aws.ProjectID = c.ProjectID
  496. aws.SpotDataRegion = c.SpotDataRegion
  497. skn, sks := aws.getAWSAuth(false, c)
  498. aws.ServiceKeyName = skn
  499. aws.ServiceKeySecret = sks
  500. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  501. klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  502. }
  503. nodeList := aws.Clientset.GetAllNodes()
  504. inputkeys := make(map[string]bool)
  505. for _, n := range nodeList {
  506. if _, ok := n.Labels["eks.amazonaws.com/nodegroup"]; ok {
  507. aws.clusterManagementPrice = 0.10
  508. aws.clusterProvisioner = "EKS"
  509. } else if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  510. aws.clusterProvisioner = "KOPS"
  511. }
  512. labels := n.GetObjectMeta().GetLabels()
  513. key := aws.GetKey(labels, n)
  514. inputkeys[key.Features()] = true
  515. }
  516. pvList := aws.Clientset.GetAllPersistentVolumes()
  517. storageClasses := aws.Clientset.GetAllStorageClasses()
  518. storageClassMap := make(map[string]map[string]string)
  519. for _, storageClass := range storageClasses {
  520. params := storageClass.Parameters
  521. storageClassMap[storageClass.ObjectMeta.Name] = params
  522. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  523. storageClassMap["default"] = params
  524. storageClassMap[""] = params
  525. }
  526. }
  527. pvkeys := make(map[string]PVKey)
  528. for _, pv := range pvList {
  529. params, ok := storageClassMap[pv.Spec.StorageClassName]
  530. if !ok {
  531. klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  532. continue
  533. }
  534. key := aws.GetPVKey(pv, params, "")
  535. pvkeys[key.Features()] = key
  536. }
  537. // RIDataRunning establishes the existance of the goroutine. Since it's possible we
  538. // run multiple downloads, we don't want to create multiple go routines if one already exists
  539. if !aws.RIDataRunning && c.AthenaBucketName != "" {
  540. err = aws.GetReservationDataFromAthena() // Block until one run has completed.
  541. if err != nil {
  542. klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
  543. } else { // If we make one successful run, check on new reservation data every hour
  544. go func() {
  545. defer errors.HandlePanic()
  546. aws.RIDataRunning = true
  547. for {
  548. klog.Infof("Reserved Instance watcher running... next update in 1h")
  549. time.Sleep(time.Hour)
  550. err := aws.GetReservationDataFromAthena()
  551. if err != nil {
  552. klog.Infof("Error updating RI data: %s", err.Error())
  553. }
  554. }
  555. }()
  556. }
  557. }
  558. if !aws.SavingsPlanDataRunning && c.AthenaBucketName != "" {
  559. err = aws.GetSavingsPlanDataFromAthena()
  560. if err != nil {
  561. klog.V(1).Infof("Failed to lookup savings plan data: %s", err.Error())
  562. } else {
  563. go func() {
  564. defer errors.HandlePanic()
  565. aws.SavingsPlanDataRunning = true
  566. for {
  567. klog.Infof("Savings Plan watcher running... next update in 1h")
  568. time.Sleep(time.Hour)
  569. err := aws.GetSavingsPlanDataFromAthena()
  570. if err != nil {
  571. klog.Infof("Error updating Savings Plan data: %s", err.Error())
  572. }
  573. }
  574. }()
  575. }
  576. }
  577. aws.Pricing = make(map[string]*AWSProductTerms)
  578. aws.ValidPricingKeys = make(map[string]bool)
  579. skusToKeys := make(map[string]string)
  580. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  581. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  582. resp, err := http.Get(pricingURL)
  583. if err != nil {
  584. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  585. return err
  586. }
  587. dec := json.NewDecoder(resp.Body)
  588. for {
  589. t, err := dec.Token()
  590. if err == io.EOF {
  591. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  592. break
  593. }
  594. if t == "products" {
  595. _, err := dec.Token() // this should parse the opening "{""
  596. if err != nil {
  597. return err
  598. }
  599. for dec.More() {
  600. _, err := dec.Token() // the sku token
  601. if err != nil {
  602. return err
  603. }
  604. product := &AWSProduct{}
  605. err = dec.Decode(&product)
  606. if err != nil {
  607. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  608. break
  609. }
  610. if product.Attributes.PreInstalledSw == "NA" &&
  611. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  612. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  613. spotKey := key + ",preemptible"
  614. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  615. productTerms := &AWSProductTerms{
  616. Sku: product.Sku,
  617. Memory: product.Attributes.Memory,
  618. Storage: product.Attributes.Storage,
  619. VCpu: product.Attributes.VCpu,
  620. GPU: product.Attributes.GPU,
  621. }
  622. aws.Pricing[key] = productTerms
  623. aws.Pricing[spotKey] = productTerms
  624. skusToKeys[product.Sku] = key
  625. }
  626. aws.ValidPricingKeys[key] = true
  627. aws.ValidPricingKeys[spotKey] = true
  628. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  629. // UsageTypes may be prefixed with a region code - we're removing this when using
  630. // volTypes to keep lookups generic
  631. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  632. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  633. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  634. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  635. spotKey := key + ",preemptible"
  636. pv := &PV{
  637. Class: volTypes[usageTypeNoRegion],
  638. Region: locationToRegion[product.Attributes.Location],
  639. }
  640. productTerms := &AWSProductTerms{
  641. Sku: product.Sku,
  642. PV: pv,
  643. }
  644. aws.Pricing[key] = productTerms
  645. aws.Pricing[spotKey] = productTerms
  646. skusToKeys[product.Sku] = key
  647. aws.ValidPricingKeys[key] = true
  648. aws.ValidPricingKeys[spotKey] = true
  649. }
  650. }
  651. }
  652. if t == "terms" {
  653. _, err := dec.Token() // this should parse the opening "{""
  654. if err != nil {
  655. return err
  656. }
  657. termType, err := dec.Token()
  658. if err != nil {
  659. return err
  660. }
  661. if termType == "OnDemand" {
  662. _, err := dec.Token()
  663. if err != nil { // again, should parse an opening "{"
  664. return err
  665. }
  666. for dec.More() {
  667. sku, err := dec.Token()
  668. if err != nil {
  669. return err
  670. }
  671. _, err = dec.Token() // another opening "{"
  672. if err != nil {
  673. return err
  674. }
  675. skuOnDemand, err := dec.Token()
  676. if err != nil {
  677. return err
  678. }
  679. offerTerm := &AWSOfferTerm{}
  680. err = dec.Decode(&offerTerm)
  681. if err != nil {
  682. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  683. }
  684. if sku.(string)+OnDemandRateCode == skuOnDemand {
  685. key, ok := skusToKeys[sku.(string)]
  686. spotKey := key + ",preemptible"
  687. if ok {
  688. aws.Pricing[key].OnDemand = offerTerm
  689. aws.Pricing[spotKey].OnDemand = offerTerm
  690. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  691. // If the specific UsageType is the per IO cost used on io1 volumes
  692. // we need to add the per IO cost to the io1 PV cost
  693. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  694. // Add the per IO cost to the PV object for the io1 volume type
  695. aws.Pricing[key].PV.CostPerIO = cost
  696. } else if strings.Contains(key, "EBS:Volume") {
  697. // If volume, we need to get hourly cost and add it to the PV object
  698. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  699. costFloat, _ := strconv.ParseFloat(cost, 64)
  700. hourlyPrice := costFloat / 730
  701. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  702. }
  703. }
  704. }
  705. _, err = dec.Token()
  706. if err != nil {
  707. return err
  708. }
  709. }
  710. _, err = dec.Token()
  711. if err != nil {
  712. return err
  713. }
  714. }
  715. }
  716. }
  717. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  718. // Always run spot pricing refresh when performing download
  719. aws.refreshSpotPricing(true)
  720. // Only start a single refresh goroutine
  721. if !aws.SpotRefreshRunning {
  722. aws.SpotRefreshRunning = true
  723. go func() {
  724. defer errors.HandlePanic()
  725. for {
  726. klog.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
  727. time.Sleep(SpotRefreshDuration)
  728. // Reoccurring refresh checks update times
  729. aws.refreshSpotPricing(false)
  730. }
  731. }()
  732. }
  733. return nil
  734. }
  735. func (aws *AWS) refreshSpotPricing(force bool) {
  736. aws.SpotPricingLock.Lock()
  737. defer aws.SpotPricingLock.Unlock()
  738. now := time.Now().UTC()
  739. updateTime := now.Add(-SpotRefreshDuration)
  740. // Return if there was an update time set and an hour hasn't elapsed
  741. if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
  742. return
  743. }
  744. sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  745. if err != nil {
  746. klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
  747. return
  748. }
  749. // update time last updated
  750. aws.SpotPricingUpdatedAt = &now
  751. aws.SpotPricingByInstanceID = sp
  752. }
  753. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  754. func (aws *AWS) NetworkPricing() (*Network, error) {
  755. cpricing, err := aws.Config.GetCustomPricingData()
  756. if err != nil {
  757. return nil, err
  758. }
  759. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  760. if err != nil {
  761. return nil, err
  762. }
  763. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  764. if err != nil {
  765. return nil, err
  766. }
  767. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  768. if err != nil {
  769. return nil, err
  770. }
  771. return &Network{
  772. ZoneNetworkEgressCost: znec,
  773. RegionNetworkEgressCost: rnec,
  774. InternetNetworkEgressCost: inec,
  775. }, nil
  776. }
  777. func (aws *AWS) LoadBalancerPricing() (*LoadBalancer, error) {
  778. fffrc := 0.025
  779. afrc := 0.010
  780. lbidc := 0.008
  781. numForwardingRules := 1.0
  782. dataIngressGB := 0.0
  783. var totalCost float64
  784. if numForwardingRules < 5 {
  785. totalCost = fffrc*numForwardingRules + lbidc*dataIngressGB
  786. } else {
  787. totalCost = fffrc*5 + afrc*(numForwardingRules-5) + lbidc*dataIngressGB
  788. }
  789. return &LoadBalancer{
  790. Cost: totalCost,
  791. }, nil
  792. }
  793. // AllNodePricing returns all the billing data fetched.
  794. func (aws *AWS) AllNodePricing() (interface{}, error) {
  795. aws.DownloadPricingDataLock.RLock()
  796. defer aws.DownloadPricingDataLock.RUnlock()
  797. return aws.Pricing, nil
  798. }
  799. func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
  800. aws.SpotPricingLock.RLock()
  801. defer aws.SpotPricingLock.RUnlock()
  802. info, ok := aws.SpotPricingByInstanceID[instanceID]
  803. return info, ok
  804. }
  805. func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
  806. aws.RIDataLock.RLock()
  807. defer aws.RIDataLock.RUnlock()
  808. data, ok := aws.RIPricingByInstanceID[instanceID]
  809. return data, ok
  810. }
  811. func (aws *AWS) savingsPlanPricing(instanceID string) (*SavingsPlanData, bool) {
  812. aws.SavingsPlanDataLock.RLock()
  813. defer aws.SavingsPlanDataLock.RUnlock()
  814. data, ok := aws.SavingsPlanDataByInstanceID[instanceID]
  815. return data, ok
  816. }
  817. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  818. key := k.Features()
  819. if spotInfo, ok := aws.spotPricing(k.ID()); ok {
  820. var spotcost string
  821. log.DedupedInfof(5, "Looking up spot data from feed for node %s", k.ID())
  822. arr := strings.Split(spotInfo.Charge, " ")
  823. if len(arr) == 2 {
  824. spotcost = arr[0]
  825. } else {
  826. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  827. }
  828. return &Node{
  829. Cost: spotcost,
  830. VCPU: terms.VCpu,
  831. RAM: terms.Memory,
  832. GPU: terms.GPU,
  833. Storage: terms.Storage,
  834. BaseCPUPrice: aws.BaseCPUPrice,
  835. BaseRAMPrice: aws.BaseRAMPrice,
  836. BaseGPUPrice: aws.BaseGPUPrice,
  837. UsageType: PreemptibleType,
  838. }, nil
  839. } else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
  840. log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
  841. return &Node{
  842. VCPU: terms.VCpu,
  843. VCPUCost: aws.BaseSpotCPUPrice,
  844. RAM: terms.Memory,
  845. GPU: terms.GPU,
  846. RAMCost: aws.BaseSpotRAMPrice,
  847. Storage: terms.Storage,
  848. BaseCPUPrice: aws.BaseCPUPrice,
  849. BaseRAMPrice: aws.BaseRAMPrice,
  850. BaseGPUPrice: aws.BaseGPUPrice,
  851. UsageType: PreemptibleType,
  852. }, nil
  853. } else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
  854. strCost := fmt.Sprintf("%f", sp.EffectiveCost)
  855. return &Node{
  856. Cost: strCost,
  857. VCPU: terms.VCpu,
  858. RAM: terms.Memory,
  859. GPU: terms.GPU,
  860. Storage: terms.Storage,
  861. BaseCPUPrice: aws.BaseCPUPrice,
  862. BaseRAMPrice: aws.BaseRAMPrice,
  863. BaseGPUPrice: aws.BaseGPUPrice,
  864. UsageType: usageType,
  865. }, nil
  866. } else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
  867. strCost := fmt.Sprintf("%f", ri.EffectiveCost)
  868. return &Node{
  869. Cost: strCost,
  870. VCPU: terms.VCpu,
  871. RAM: terms.Memory,
  872. GPU: terms.GPU,
  873. Storage: terms.Storage,
  874. BaseCPUPrice: aws.BaseCPUPrice,
  875. BaseRAMPrice: aws.BaseRAMPrice,
  876. BaseGPUPrice: aws.BaseGPUPrice,
  877. UsageType: usageType,
  878. }, nil
  879. }
  880. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  881. if !ok {
  882. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  883. }
  884. cost := c.PricePerUnit.USD
  885. return &Node{
  886. Cost: cost,
  887. VCPU: terms.VCpu,
  888. RAM: terms.Memory,
  889. GPU: terms.GPU,
  890. Storage: terms.Storage,
  891. BaseCPUPrice: aws.BaseCPUPrice,
  892. BaseRAMPrice: aws.BaseRAMPrice,
  893. BaseGPUPrice: aws.BaseGPUPrice,
  894. UsageType: usageType,
  895. }, nil
  896. }
  897. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  898. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  899. aws.DownloadPricingDataLock.RLock()
  900. defer aws.DownloadPricingDataLock.RUnlock()
  901. key := k.Features()
  902. usageType := "ondemand"
  903. if aws.isPreemptible(key) {
  904. usageType = PreemptibleType
  905. }
  906. terms, ok := aws.Pricing[key]
  907. if ok {
  908. return aws.createNode(terms, usageType, k)
  909. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  910. aws.DownloadPricingDataLock.RUnlock()
  911. err := aws.DownloadPricingData()
  912. aws.DownloadPricingDataLock.RLock()
  913. if err != nil {
  914. return &Node{
  915. Cost: aws.BaseCPUPrice,
  916. BaseCPUPrice: aws.BaseCPUPrice,
  917. BaseRAMPrice: aws.BaseRAMPrice,
  918. BaseGPUPrice: aws.BaseGPUPrice,
  919. UsageType: usageType,
  920. UsesBaseCPUPrice: true,
  921. }, err
  922. }
  923. terms, termsOk := aws.Pricing[key]
  924. if !termsOk {
  925. return &Node{
  926. Cost: aws.BaseCPUPrice,
  927. BaseCPUPrice: aws.BaseCPUPrice,
  928. BaseRAMPrice: aws.BaseRAMPrice,
  929. BaseGPUPrice: aws.BaseGPUPrice,
  930. UsageType: usageType,
  931. UsesBaseCPUPrice: true,
  932. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  933. }
  934. return aws.createNode(terms, usageType, k)
  935. } else { // Fall back to base pricing if we can't find the key. Base pricing is handled at the costmodel level.
  936. return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
  937. }
  938. }
  939. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  940. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  941. defaultClusterName := "AWS Cluster #1"
  942. c, err := awsProvider.GetConfig()
  943. if err != nil {
  944. return nil, err
  945. }
  946. remoteEnabled := env.IsRemoteEnabled()
  947. if c.ClusterName != "" {
  948. m := make(map[string]string)
  949. m["name"] = c.ClusterName
  950. m["provider"] = "AWS"
  951. m["id"] = env.GetClusterID()
  952. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  953. m["provisioner"] = awsProvider.clusterProvisioner
  954. return m, nil
  955. }
  956. makeStructure := func(clusterName string) (map[string]string, error) {
  957. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  958. m := make(map[string]string)
  959. m["name"] = clusterName
  960. m["provider"] = "AWS"
  961. m["id"] = env.GetClusterID()
  962. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  963. return m, nil
  964. }
  965. maybeClusterId := env.GetAWSClusterID()
  966. if len(maybeClusterId) != 0 {
  967. return makeStructure(maybeClusterId)
  968. }
  969. // TODO: This should be cached, it can take a long time to hit the API
  970. //provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  971. //clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  972. //klog.Infof("nodelist get here %s", time.Now())
  973. //nodeList := awsProvider.Clientset.GetAllNodes()
  974. //klog.Infof("nodelist done here %s", time.Now())
  975. /*for _, n := range nodeList {
  976. region := ""
  977. instanceId := ""
  978. providerId := n.Spec.ProviderID
  979. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  980. if matchNum == 1 {
  981. region = group
  982. } else if matchNum == 2 {
  983. instanceId = group
  984. }
  985. }
  986. if len(instanceId) == 0 {
  987. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  988. continue
  989. }
  990. c := &aws.Config{
  991. Region: aws.String(region),
  992. }
  993. s := session.Must(session.NewSession(c))
  994. ec2Svc := ec2.New(s)
  995. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  996. InstanceIds: []*string{
  997. aws.String(instanceId),
  998. },
  999. })
  1000. if diErr != nil {
  1001. klog.Infof("Error describing instances: %s", diErr)
  1002. continue
  1003. }
  1004. if len(di.Reservations) != 1 {
  1005. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  1006. continue
  1007. }
  1008. res := di.Reservations[0]
  1009. if len(res.Instances) != 1 {
  1010. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  1011. continue
  1012. }
  1013. inst := res.Instances[0]
  1014. for _, tag := range inst.Tags {
  1015. tagKey := *tag.Key
  1016. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  1017. if matchNum != 1 {
  1018. continue
  1019. }
  1020. return makeStructure(group)
  1021. }
  1022. }
  1023. }*/
  1024. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", env.AWSClusterIDEnvVar)
  1025. return makeStructure(defaultClusterName)
  1026. }
  1027. // Gets the aws key id and secret
  1028. func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
  1029. if aws.ServiceAccountChecks == nil { // safety in case checks don't exist
  1030. aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  1031. }
  1032. // 1. Check config values first (set from frontend UI)
  1033. if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
  1034. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1035. Message: "AWS ServiceKey exists",
  1036. Status: true,
  1037. }
  1038. return cp.ServiceKeyName, cp.ServiceKeySecret
  1039. }
  1040. // 2. Check for secret
  1041. s, _ := aws.loadAWSAuthSecret(forceReload)
  1042. if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
  1043. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1044. Message: "AWS ServiceKey exists",
  1045. Status: true,
  1046. }
  1047. return s.AccessKeyID, s.SecretAccessKey
  1048. }
  1049. // 3. Fall back to env vars
  1050. if env.GetAWSAccessKeyID() == "" || env.GetAWSAccessKeyID() == "" {
  1051. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1052. Message: "AWS ServiceKey exists",
  1053. Status: false,
  1054. }
  1055. } else {
  1056. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1057. Message: "AWS ServiceKey exists",
  1058. Status: true,
  1059. }
  1060. }
  1061. return env.GetAWSAccessKeyID(), env.GetAWSAccessKeySecret()
  1062. }
  1063. // Load once and cache the result (even on failure). This is an install time secret, so
  1064. // we don't expect the secret to change. If it does, however, we can force reload using
  1065. // the input parameter.
  1066. func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
  1067. if !force && loadedAWSSecret {
  1068. return awsSecret, nil
  1069. }
  1070. loadedAWSSecret = true
  1071. exists, err := util.FileExists(authSecretPath)
  1072. if !exists || err != nil {
  1073. return nil, fmt.Errorf("Failed to locate service account file: %s", authSecretPath)
  1074. }
  1075. result, err := ioutil.ReadFile(authSecretPath)
  1076. if err != nil {
  1077. return nil, err
  1078. }
  1079. var ak AWSAccessKey
  1080. err = json.Unmarshal(result, &ak)
  1081. if err != nil {
  1082. return nil, err
  1083. }
  1084. awsSecret = &ak
  1085. return awsSecret, nil
  1086. }
  1087. func (aws *AWS) configureAWSAuth() error {
  1088. accessKeyID := aws.ServiceKeyName
  1089. accessKeySecret := aws.ServiceKeySecret
  1090. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1091. err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
  1092. if err != nil {
  1093. return err
  1094. }
  1095. err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
  1096. if err != nil {
  1097. return err
  1098. }
  1099. }
  1100. return nil
  1101. }
  1102. func getClusterConfig(ccFile string) (map[string]string, error) {
  1103. clusterConfig, err := os.Open(ccFile)
  1104. if err != nil {
  1105. return nil, err
  1106. }
  1107. defer clusterConfig.Close()
  1108. b, err := ioutil.ReadAll(clusterConfig)
  1109. if err != nil {
  1110. return nil, err
  1111. }
  1112. var clusterConf map[string]string
  1113. err = json.Unmarshal([]byte(b), &clusterConf)
  1114. if err != nil {
  1115. return nil, err
  1116. }
  1117. return clusterConf, nil
  1118. }
  1119. // SetKeyEnv ensures that the two environment variables necessary to configure
  1120. // a new AWS Session are set.
  1121. func (a *AWS) SetKeyEnv() error {
  1122. // TODO add this to the helm chart, mirroring the cost-model
  1123. // configPath := env.GetConfigPath()
  1124. configPath := defaultConfigPath
  1125. path := configPath + "aws.json"
  1126. if _, err := os.Stat(path); err != nil {
  1127. if os.IsNotExist(err) {
  1128. log.DedupedErrorf(5, "file %s does not exist", path)
  1129. } else {
  1130. log.DedupedErrorf(5, "other file open error: %s", err)
  1131. }
  1132. return err
  1133. }
  1134. jsonFile, err := os.Open(path)
  1135. defer jsonFile.Close()
  1136. configMap := map[string]string{}
  1137. configBytes, err := ioutil.ReadAll(jsonFile)
  1138. if err != nil {
  1139. return err
  1140. }
  1141. json.Unmarshal([]byte(configBytes), &configMap)
  1142. keyName := configMap["awsServiceKeyName"]
  1143. keySecret := configMap["awsServiceKeySecret"]
  1144. // These are required before calling NewEnvCredentials below
  1145. env.Set(env.AWSAccessKeyIDEnvVar, keyName)
  1146. env.Set(env.AWSAccessKeySecretEnvVar, keySecret)
  1147. return nil
  1148. }
  1149. func (a *AWS) getAddressesForRegion(region string) (*ec2.DescribeAddressesOutput, error) {
  1150. sess, err := session.NewSession(&aws.Config{
  1151. Region: aws.String(region),
  1152. Credentials: credentials.NewEnvCredentials(),
  1153. })
  1154. if err != nil {
  1155. return nil, err
  1156. }
  1157. ec2Svc := ec2.New(sess)
  1158. return ec2Svc.DescribeAddresses(&ec2.DescribeAddressesInput{})
  1159. }
  1160. func (a *AWS) GetAddresses() ([]byte, error) {
  1161. if err := a.SetKeyEnv(); err != nil {
  1162. return nil, err
  1163. }
  1164. addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
  1165. errorCh := make(chan error, len(awsRegions))
  1166. var wg sync.WaitGroup
  1167. wg.Add(len(awsRegions))
  1168. // Get volumes from each AWS region
  1169. for _, r := range awsRegions {
  1170. // Fetch IP address response and send results and errors to their
  1171. // respective channels
  1172. go func(region string) {
  1173. defer wg.Done()
  1174. defer errors.HandlePanic()
  1175. // Query for first page of volume results
  1176. resp, err := a.getAddressesForRegion(region)
  1177. if err != nil {
  1178. if aerr, ok := err.(awserr.Error); ok {
  1179. switch aerr.Code() {
  1180. default:
  1181. errorCh <- aerr
  1182. }
  1183. return
  1184. } else {
  1185. errorCh <- err
  1186. return
  1187. }
  1188. }
  1189. addressCh <- resp
  1190. }(r)
  1191. }
  1192. // Close the result channels after everything has been sent
  1193. go func() {
  1194. defer errors.HandlePanic()
  1195. wg.Wait()
  1196. close(errorCh)
  1197. close(addressCh)
  1198. }()
  1199. addresses := []*ec2.Address{}
  1200. for adds := range addressCh {
  1201. addresses = append(addresses, adds.Addresses...)
  1202. }
  1203. errors := []error{}
  1204. for err := range errorCh {
  1205. log.DedupedWarningf(5, "unable to get addresses: %s", err)
  1206. errors = append(errors, err)
  1207. }
  1208. // Return error if no addresses are returned
  1209. if len(errors) > 0 && len(addresses) == 0 {
  1210. return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errors), errors)
  1211. }
  1212. // Format the response this way to match the JSON-encoded formatting of a single response
  1213. // from DescribeAddresss, so that consumers can always expect AWS disk responses to have
  1214. // a "Addresss" key at the top level.
  1215. return json.Marshal(map[string][]*ec2.Address{
  1216. "Addresses": addresses,
  1217. })
  1218. }
  1219. func (a *AWS) getDisksForRegion(region string, maxResults int64, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
  1220. sess, err := session.NewSession(&aws.Config{
  1221. Region: aws.String(region),
  1222. Credentials: credentials.NewEnvCredentials(),
  1223. })
  1224. if err != nil {
  1225. return nil, err
  1226. }
  1227. ec2Svc := ec2.New(sess)
  1228. return ec2Svc.DescribeVolumes(&ec2.DescribeVolumesInput{
  1229. MaxResults: &maxResults,
  1230. NextToken: nextToken,
  1231. })
  1232. }
  1233. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  1234. func (a *AWS) GetDisks() ([]byte, error) {
  1235. if err := a.SetKeyEnv(); err != nil {
  1236. return nil, err
  1237. }
  1238. volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
  1239. errorCh := make(chan error, len(awsRegions))
  1240. var wg sync.WaitGroup
  1241. wg.Add(len(awsRegions))
  1242. // Get volumes from each AWS region
  1243. for _, r := range awsRegions {
  1244. // Fetch volume response and send results and errors to their
  1245. // respective channels
  1246. go func(region string) {
  1247. defer wg.Done()
  1248. defer errors.HandlePanic()
  1249. // Query for first page of volume results
  1250. resp, err := a.getDisksForRegion(region, 1000, nil)
  1251. if err != nil {
  1252. if aerr, ok := err.(awserr.Error); ok {
  1253. switch aerr.Code() {
  1254. default:
  1255. errorCh <- aerr
  1256. }
  1257. return
  1258. } else {
  1259. errorCh <- err
  1260. return
  1261. }
  1262. }
  1263. volumeCh <- resp
  1264. // A NextToken indicates more pages of results. Keep querying
  1265. // until all pages are retrieved.
  1266. for resp.NextToken != nil {
  1267. resp, err = a.getDisksForRegion(region, 100, resp.NextToken)
  1268. if err != nil {
  1269. if aerr, ok := err.(awserr.Error); ok {
  1270. switch aerr.Code() {
  1271. default:
  1272. errorCh <- aerr
  1273. }
  1274. return
  1275. } else {
  1276. errorCh <- err
  1277. return
  1278. }
  1279. }
  1280. volumeCh <- resp
  1281. }
  1282. }(r)
  1283. }
  1284. // Close the result channels after everything has been sent
  1285. go func() {
  1286. defer errors.HandlePanic()
  1287. wg.Wait()
  1288. close(errorCh)
  1289. close(volumeCh)
  1290. }()
  1291. volumes := []*ec2.Volume{}
  1292. for vols := range volumeCh {
  1293. volumes = append(volumes, vols.Volumes...)
  1294. }
  1295. errors := []error{}
  1296. for err := range errorCh {
  1297. log.DedupedWarningf(5, "unable to get disks: %s", err)
  1298. errors = append(errors, err)
  1299. }
  1300. // Return error if no volumes are returned
  1301. if len(errors) > 0 && len(volumes) == 0 {
  1302. return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errors), errors)
  1303. }
  1304. // Format the response this way to match the JSON-encoded formatting of a single response
  1305. // from DescribeVolumes, so that consumers can always expect AWS disk responses to have
  1306. // a "Volumes" key at the top level.
  1307. return json.Marshal(map[string][]*ec2.Volume{
  1308. "Volumes": volumes,
  1309. })
  1310. }
  1311. // ConvertToGlueColumnFormat takes a string and runs through various regex
  1312. // and string replacement statements to convert it to a format compatible
  1313. // with AWS Glue and Athena column names.
  1314. // Following guidance from AWS provided here ('Column Names' section):
  1315. // https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/run-athena-sql.html
  1316. // It returns a string containing the column name in proper column name format and length.
  1317. func ConvertToGlueColumnFormat(column_name string) string {
  1318. klog.V(5).Infof("Converting string \"%s\" to proper AWS Glue column name.", column_name)
  1319. // An underscore is added in front of uppercase letters
  1320. capital_underscore := regexp.MustCompile(`[A-Z]`)
  1321. final := capital_underscore.ReplaceAllString(column_name, `_$0`)
  1322. // Any non-alphanumeric characters are replaced with an underscore
  1323. no_space_punc := regexp.MustCompile(`[\s]{1,}|[^A-Za-z0-9]`)
  1324. final = no_space_punc.ReplaceAllString(final, "_")
  1325. // Duplicate underscores are removed
  1326. no_dup_underscore := regexp.MustCompile(`_{2,}`)
  1327. final = no_dup_underscore.ReplaceAllString(final, "_")
  1328. // Any leading and trailing underscores are removed
  1329. no_front_end_underscore := regexp.MustCompile(`(^\_|\_$)`)
  1330. final = no_front_end_underscore.ReplaceAllString(final, "")
  1331. // Uppercase to lowercase
  1332. final = strings.ToLower(final)
  1333. // Longer column name than expected - remove _ left to right
  1334. allowed_col_len := 128
  1335. undersc_to_remove := len(final) - allowed_col_len
  1336. if undersc_to_remove > 0 {
  1337. final = strings.Replace(final, "_", "", undersc_to_remove)
  1338. }
  1339. // If removing all of the underscores still didn't
  1340. // make the column name < 128 characters, trim it!
  1341. if len(final) > allowed_col_len {
  1342. final = final[:allowed_col_len]
  1343. }
  1344. klog.V(5).Infof("Column name being returned: \"%s\". Length: \"%d\".", final, len(final))
  1345. return final
  1346. }
  1347. func generateAWSGroupBy(lastIdx int) string {
  1348. sequence := []string{}
  1349. for i := 1; i < lastIdx+1; i++ {
  1350. sequence = append(sequence, strconv.Itoa(i))
  1351. }
  1352. return strings.Join(sequence, ",")
  1353. }
  1354. func (a *AWS) QueryAthenaPaginated(query string) (*athena.GetQueryResultsInput, *athena.Athena, error) {
  1355. customPricing, err := a.GetConfig()
  1356. if err != nil {
  1357. return nil, nil, err
  1358. }
  1359. if customPricing.ServiceKeyName != "" {
  1360. err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1361. if err != nil {
  1362. return nil, nil, err
  1363. }
  1364. err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1365. if err != nil {
  1366. return nil, nil, err
  1367. }
  1368. }
  1369. region := aws.String(customPricing.AthenaRegion)
  1370. resultsBucket := customPricing.AthenaBucketName
  1371. database := customPricing.AthenaDatabase
  1372. c := &aws.Config{
  1373. Region: region,
  1374. }
  1375. s := session.Must(session.NewSession(c))
  1376. svc := athena.New(s)
  1377. if customPricing.MasterPayerARN != "" {
  1378. creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
  1379. svc = athena.New(s, &aws.Config{
  1380. Region: region,
  1381. Credentials: creds,
  1382. })
  1383. }
  1384. var e athena.StartQueryExecutionInput
  1385. var r athena.ResultConfiguration
  1386. r.SetOutputLocation(resultsBucket)
  1387. e.SetResultConfiguration(&r)
  1388. e.SetQueryString(query)
  1389. var q athena.QueryExecutionContext
  1390. q.SetDatabase(database)
  1391. e.SetQueryExecutionContext(&q)
  1392. res, err := svc.StartQueryExecution(&e)
  1393. if err != nil {
  1394. return nil, svc, err
  1395. }
  1396. klog.V(2).Infof("StartQueryExecution result:")
  1397. klog.V(2).Infof(res.GoString())
  1398. var qri athena.GetQueryExecutionInput
  1399. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1400. var qrop *athena.GetQueryExecutionOutput
  1401. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1402. for {
  1403. qrop, err = svc.GetQueryExecution(&qri)
  1404. if err != nil {
  1405. return nil, svc, err
  1406. }
  1407. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1408. break
  1409. }
  1410. time.Sleep(duration)
  1411. }
  1412. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1413. var ip athena.GetQueryResultsInput
  1414. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1415. return &ip, svc, nil
  1416. } else {
  1417. return nil, svc, fmt.Errorf("No results available for %s", query)
  1418. }
  1419. }
  1420. func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
  1421. customPricing, err := a.GetConfig()
  1422. if err != nil {
  1423. return nil, err
  1424. }
  1425. if customPricing.ServiceKeyName != "" {
  1426. err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1427. if err != nil {
  1428. return nil, err
  1429. }
  1430. err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1431. if err != nil {
  1432. return nil, err
  1433. }
  1434. }
  1435. region := aws.String(customPricing.AthenaRegion)
  1436. resultsBucket := customPricing.AthenaBucketName
  1437. database := customPricing.AthenaDatabase
  1438. c := &aws.Config{
  1439. Region: region,
  1440. }
  1441. s := session.Must(session.NewSession(c))
  1442. svc := athena.New(s)
  1443. if customPricing.MasterPayerARN != "" {
  1444. creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
  1445. svc = athena.New(s, &aws.Config{
  1446. Region: region,
  1447. Credentials: creds,
  1448. })
  1449. }
  1450. var e athena.StartQueryExecutionInput
  1451. var r athena.ResultConfiguration
  1452. r.SetOutputLocation(resultsBucket)
  1453. e.SetResultConfiguration(&r)
  1454. e.SetQueryString(query)
  1455. var q athena.QueryExecutionContext
  1456. q.SetDatabase(database)
  1457. e.SetQueryExecutionContext(&q)
  1458. res, err := svc.StartQueryExecution(&e)
  1459. if err != nil {
  1460. return nil, err
  1461. }
  1462. klog.V(2).Infof("StartQueryExecution result:")
  1463. klog.V(2).Infof(res.GoString())
  1464. var qri athena.GetQueryExecutionInput
  1465. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1466. var qrop *athena.GetQueryExecutionOutput
  1467. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1468. for {
  1469. qrop, err = svc.GetQueryExecution(&qri)
  1470. if err != nil {
  1471. return nil, err
  1472. }
  1473. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1474. break
  1475. }
  1476. time.Sleep(duration)
  1477. }
  1478. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1479. var ip athena.GetQueryResultsInput
  1480. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1481. return svc.GetQueryResults(&ip)
  1482. } else {
  1483. return nil, fmt.Errorf("No results available for %s", query)
  1484. }
  1485. }
  1486. type SavingsPlanData struct {
  1487. ResourceID string
  1488. EffectiveCost float64
  1489. SavingsPlanARN string
  1490. MostRecentDate string
  1491. }
  1492. func (a *AWS) GetSavingsPlanDataFromAthena() error {
  1493. cfg, err := a.GetConfig()
  1494. if err != nil {
  1495. return err
  1496. }
  1497. if cfg.AthenaBucketName == "" {
  1498. return fmt.Errorf("No Athena Bucket configured")
  1499. }
  1500. if a.SavingsPlanDataByInstanceID == nil {
  1501. a.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData)
  1502. }
  1503. tNow := time.Now()
  1504. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1505. start := tOneDayAgo.Format("2006-01-02")
  1506. end := tNow.Format("2006-01-02")
  1507. q := `SELECT
  1508. line_item_usage_start_date,
  1509. savings_plan_savings_plan_a_r_n,
  1510. line_item_resource_id,
  1511. savings_plan_savings_plan_effective_cost
  1512. FROM %s as cost_data
  1513. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1514. AND line_item_line_item_type = 'SavingsPlanCoveredUsage' ORDER BY
  1515. line_item_usage_start_date DESC`
  1516. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1517. op, err := a.QueryAthenaBillingData(query)
  1518. if err != nil {
  1519. return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
  1520. }
  1521. klog.Infof("Fetching SavingsPlan data...")
  1522. if len(op.ResultSet.Rows) > 1 {
  1523. a.SavingsPlanDataLock.Lock()
  1524. mostRecentDate := ""
  1525. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1526. d := *r.Data[0].VarCharValue
  1527. if mostRecentDate == "" {
  1528. mostRecentDate = d
  1529. } else if mostRecentDate != d { // Get all most recent assignments
  1530. break
  1531. }
  1532. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1533. if err != nil {
  1534. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1535. }
  1536. r := &SavingsPlanData{
  1537. ResourceID: *r.Data[2].VarCharValue,
  1538. EffectiveCost: cost,
  1539. SavingsPlanARN: *r.Data[1].VarCharValue,
  1540. MostRecentDate: d,
  1541. }
  1542. a.SavingsPlanDataByInstanceID[r.ResourceID] = r
  1543. }
  1544. klog.V(1).Infof("Found %d savings plan applied instances", len(a.SavingsPlanDataByInstanceID))
  1545. for k, r := range a.SavingsPlanDataByInstanceID {
  1546. log.DedupedInfof(5, "Savings Plan Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1547. }
  1548. a.SavingsPlanDataLock.Unlock()
  1549. } else {
  1550. klog.Infof("No savings plan applied instance data found")
  1551. }
  1552. return nil
  1553. }
  1554. type RIData struct {
  1555. ResourceID string
  1556. EffectiveCost float64
  1557. ReservationARN string
  1558. MostRecentDate string
  1559. }
  1560. func (a *AWS) GetReservationDataFromAthena() error {
  1561. cfg, err := a.GetConfig()
  1562. if err != nil {
  1563. return err
  1564. }
  1565. if cfg.AthenaBucketName == "" {
  1566. return fmt.Errorf("No Athena Bucket configured")
  1567. }
  1568. if a.RIPricingByInstanceID == nil {
  1569. a.RIPricingByInstanceID = make(map[string]*RIData)
  1570. }
  1571. tNow := time.Now()
  1572. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1573. start := tOneDayAgo.Format("2006-01-02")
  1574. end := tNow.Format("2006-01-02")
  1575. q := `SELECT
  1576. line_item_usage_start_date,
  1577. reservation_reservation_a_r_n,
  1578. line_item_resource_id,
  1579. reservation_effective_cost
  1580. FROM %s as cost_data
  1581. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1582. AND reservation_reservation_a_r_n <> '' ORDER BY
  1583. line_item_usage_start_date DESC`
  1584. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1585. op, err := a.QueryAthenaBillingData(query)
  1586. if err != nil {
  1587. return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
  1588. }
  1589. klog.Infof("Fetching RI data...")
  1590. if len(op.ResultSet.Rows) > 1 {
  1591. a.RIDataLock.Lock()
  1592. mostRecentDate := ""
  1593. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1594. d := *r.Data[0].VarCharValue
  1595. if mostRecentDate == "" {
  1596. mostRecentDate = d
  1597. } else if mostRecentDate != d { // Get all most recent assignments
  1598. break
  1599. }
  1600. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1601. if err != nil {
  1602. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1603. }
  1604. r := &RIData{
  1605. ResourceID: *r.Data[2].VarCharValue,
  1606. EffectiveCost: cost,
  1607. ReservationARN: *r.Data[1].VarCharValue,
  1608. MostRecentDate: d,
  1609. }
  1610. a.RIPricingByInstanceID[r.ResourceID] = r
  1611. }
  1612. klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
  1613. for k, r := range a.RIPricingByInstanceID {
  1614. log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1615. }
  1616. a.RIDataLock.Unlock()
  1617. } else {
  1618. klog.Infof("No reserved instance data found")
  1619. }
  1620. return nil
  1621. }
  1622. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  1623. // "start" and "end" are dates of the format YYYY-MM-DD
  1624. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  1625. func (a *AWS) ExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string, crossCluster bool) ([]*OutOfClusterAllocation, error) {
  1626. customPricing, err := a.GetConfig()
  1627. if err != nil {
  1628. return nil, err
  1629. }
  1630. formattedAggregators := []string{}
  1631. for _, agg := range aggregators {
  1632. aggregator_column_name := "resource_tags_user_" + agg
  1633. aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
  1634. formattedAggregators = append(formattedAggregators, aggregator_column_name)
  1635. }
  1636. aggregatorNames := strings.Join(formattedAggregators, ",")
  1637. aggregatorOr := strings.Join(formattedAggregators, " <> '' OR ")
  1638. aggregatorOr = aggregatorOr + " <> ''"
  1639. filter_column_name := "resource_tags_user_" + filterType
  1640. filter_column_name = ConvertToGlueColumnFormat(filter_column_name)
  1641. var query string
  1642. var lastIdx int
  1643. if filterType != "kubernetes_" { // This gets appended upstream and is equivalent to no filter.
  1644. lastIdx = len(formattedAggregators) + 3
  1645. groupby := generateAWSGroupBy(lastIdx)
  1646. query = fmt.Sprintf(`SELECT
  1647. CAST(line_item_usage_start_date AS DATE) as start_date,
  1648. %s,
  1649. line_item_product_code,
  1650. %s,
  1651. SUM(line_item_blended_cost) as blended_cost
  1652. FROM %s as cost_data
  1653. WHERE (%s='%s') AND line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
  1654. GROUP BY %s`, aggregatorNames, filter_column_name, customPricing.AthenaTable, filter_column_name, filterValue, start, end, aggregatorOr, groupby)
  1655. } else {
  1656. lastIdx = len(formattedAggregators) + 2
  1657. groupby := generateAWSGroupBy(lastIdx)
  1658. query = fmt.Sprintf(`SELECT
  1659. CAST(line_item_usage_start_date AS DATE) as start_date,
  1660. %s,
  1661. line_item_product_code,
  1662. SUM(line_item_blended_cost) as blended_cost
  1663. FROM %s as cost_data
  1664. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
  1665. GROUP BY %s`, aggregatorNames, customPricing.AthenaTable, start, end, aggregatorOr, groupby)
  1666. }
  1667. var oocAllocs []*OutOfClusterAllocation
  1668. page := 0
  1669. processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
  1670. iter := op.ResultSet.Rows
  1671. if page == 0 && len(iter) > 1 {
  1672. iter = op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)]
  1673. }
  1674. page++
  1675. for _, r := range iter {
  1676. cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
  1677. if err != nil {
  1678. klog.Infof("Error converting cost `%s` from float ", *r.Data[lastIdx].VarCharValue)
  1679. }
  1680. environment := ""
  1681. for _, d := range r.Data[1 : len(formattedAggregators)+1] {
  1682. if *d.VarCharValue != "" {
  1683. environment = *d.VarCharValue // just set to the first nonempty match
  1684. }
  1685. break
  1686. }
  1687. ooc := &OutOfClusterAllocation{
  1688. Aggregator: strings.Join(aggregators, ","),
  1689. Environment: environment,
  1690. Service: *r.Data[len(formattedAggregators)+1].VarCharValue,
  1691. Cost: cost,
  1692. }
  1693. oocAllocs = append(oocAllocs, ooc)
  1694. }
  1695. return true
  1696. }
  1697. klog.V(3).Infof("Running Query: %s", query)
  1698. ip, svc, err := a.QueryAthenaPaginated(query)
  1699. athenaErr := svc.GetQueryResultsPages(ip, processResults)
  1700. if athenaErr != nil {
  1701. klog.Infof("RETURNING ATHENA ERROR")
  1702. return nil, athenaErr
  1703. }
  1704. if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
  1705. gcp, err := NewCrossClusterProvider("gcp", "aws.json", a.Clientset)
  1706. if err != nil {
  1707. klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
  1708. }
  1709. gcpOOC, err := gcp.ExternalAllocations(start, end, aggregators, filterType, filterValue, true)
  1710. if err != nil {
  1711. klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
  1712. }
  1713. oocAllocs = append(oocAllocs, gcpOOC...)
  1714. }
  1715. return oocAllocs, nil
  1716. }
  1717. // QuerySQL can query a properly configured Athena database.
  1718. // Used to fetch billing data.
  1719. // Requires a json config in /var/configs with key region, output, and database.
  1720. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  1721. customPricing, err := a.GetConfig()
  1722. if err != nil {
  1723. return nil, err
  1724. }
  1725. if customPricing.ServiceKeyName != "" {
  1726. err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1727. if err != nil {
  1728. return nil, err
  1729. }
  1730. err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1731. if err != nil {
  1732. return nil, err
  1733. }
  1734. }
  1735. athenaConfigs, err := os.Open("/var/configs/athena.json")
  1736. if err != nil {
  1737. return nil, err
  1738. }
  1739. defer athenaConfigs.Close()
  1740. b, err := ioutil.ReadAll(athenaConfigs)
  1741. if err != nil {
  1742. return nil, err
  1743. }
  1744. var athenaConf map[string]string
  1745. json.Unmarshal([]byte(b), &athenaConf)
  1746. region := aws.String(customPricing.AthenaRegion)
  1747. resultsBucket := customPricing.AthenaBucketName
  1748. database := customPricing.AthenaDatabase
  1749. c := &aws.Config{
  1750. Region: region,
  1751. }
  1752. s := session.Must(session.NewSession(c))
  1753. svc := athena.New(s)
  1754. var e athena.StartQueryExecutionInput
  1755. var r athena.ResultConfiguration
  1756. r.SetOutputLocation(resultsBucket)
  1757. e.SetResultConfiguration(&r)
  1758. e.SetQueryString(query)
  1759. var q athena.QueryExecutionContext
  1760. q.SetDatabase(database)
  1761. e.SetQueryExecutionContext(&q)
  1762. res, err := svc.StartQueryExecution(&e)
  1763. if err != nil {
  1764. return nil, err
  1765. }
  1766. klog.V(2).Infof("StartQueryExecution result:")
  1767. klog.V(2).Infof(res.GoString())
  1768. var qri athena.GetQueryExecutionInput
  1769. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1770. var qrop *athena.GetQueryExecutionOutput
  1771. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1772. for {
  1773. qrop, err = svc.GetQueryExecution(&qri)
  1774. if err != nil {
  1775. return nil, err
  1776. }
  1777. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1778. break
  1779. }
  1780. time.Sleep(duration)
  1781. }
  1782. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1783. var ip athena.GetQueryResultsInput
  1784. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1785. op, err := svc.GetQueryResults(&ip)
  1786. if err != nil {
  1787. return nil, err
  1788. }
  1789. b, err := json.Marshal(op.ResultSet)
  1790. if err != nil {
  1791. return nil, err
  1792. }
  1793. return b, nil
  1794. }
  1795. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  1796. }
  1797. type spotInfo struct {
  1798. Timestamp string `csv:"Timestamp"`
  1799. UsageType string `csv:"UsageType"`
  1800. Operation string `csv:"Operation"`
  1801. InstanceID string `csv:"InstanceID"`
  1802. MyBidID string `csv:"MyBidID"`
  1803. MyMaxPrice string `csv:"MyMaxPrice"`
  1804. MarketPrice string `csv:"MarketPrice"`
  1805. Charge string `csv:"Charge"`
  1806. Version string `csv:"Version"`
  1807. }
  1808. type fnames []*string
  1809. func (f fnames) Len() int {
  1810. return len(f)
  1811. }
  1812. func (f fnames) Swap(i, j int) {
  1813. f[i], f[j] = f[j], f[i]
  1814. }
  1815. func (f fnames) Less(i, j int) bool {
  1816. key1 := strings.Split(*f[i], ".")
  1817. key2 := strings.Split(*f[j], ".")
  1818. t1, err := time.Parse("2006-01-02-15", key1[1])
  1819. if err != nil {
  1820. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  1821. return false
  1822. }
  1823. t2, err := time.Parse("2006-01-02-15", key2[1])
  1824. if err != nil {
  1825. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  1826. return false
  1827. }
  1828. return t1.Before(t2)
  1829. }
  1830. func (a *AWS) parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  1831. if a.ServiceAccountChecks == nil { // Set up checks to store error/success states
  1832. a.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  1833. }
  1834. // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1835. if accessKeyID != "" && accessKeySecret != "" {
  1836. err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
  1837. if err != nil {
  1838. return nil, err
  1839. }
  1840. err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
  1841. if err != nil {
  1842. return nil, err
  1843. }
  1844. }
  1845. s3Prefix := projectID
  1846. if len(prefix) != 0 {
  1847. s3Prefix = prefix + "/" + s3Prefix
  1848. }
  1849. c := aws.NewConfig().WithRegion(region)
  1850. s := session.Must(session.NewSession(c))
  1851. s3Svc := s3.New(s)
  1852. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  1853. tNow := time.Now()
  1854. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1855. ls := &s3.ListObjectsInput{
  1856. Bucket: aws.String(bucket),
  1857. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1858. }
  1859. ls2 := &s3.ListObjectsInput{
  1860. Bucket: aws.String(bucket),
  1861. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1862. }
  1863. lso, err := s3Svc.ListObjects(ls)
  1864. if err != nil {
  1865. a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
  1866. Message: "Bucket List Permissions Available",
  1867. Status: false,
  1868. AdditionalInfo: err.Error(),
  1869. }
  1870. return nil, err
  1871. } else {
  1872. a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
  1873. Message: "Bucket List Permissions Available",
  1874. Status: true,
  1875. }
  1876. }
  1877. lsoLen := len(lso.Contents)
  1878. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1879. if lsoLen == 0 {
  1880. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1881. }
  1882. lso2, err := s3Svc.ListObjects(ls2)
  1883. if err != nil {
  1884. return nil, err
  1885. }
  1886. lso2Len := len(lso2.Contents)
  1887. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1888. if lso2Len == 0 {
  1889. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1890. }
  1891. // TODO: Worth it to use LastModifiedDate to determine if we should reparse the spot data?
  1892. var keys []*string
  1893. for _, obj := range lso.Contents {
  1894. keys = append(keys, obj.Key)
  1895. }
  1896. for _, obj := range lso2.Contents {
  1897. keys = append(keys, obj.Key)
  1898. }
  1899. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1900. header, err := csvutil.Header(spotInfo{}, "csv")
  1901. if err != nil {
  1902. return nil, err
  1903. }
  1904. fieldsPerRecord := len(header)
  1905. spots := make(map[string]*spotInfo)
  1906. for _, key := range keys {
  1907. getObj := &s3.GetObjectInput{
  1908. Bucket: aws.String(bucket),
  1909. Key: key,
  1910. }
  1911. buf := aws.NewWriteAtBuffer([]byte{})
  1912. _, err := downloader.Download(buf, getObj)
  1913. if err != nil {
  1914. a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
  1915. Message: "Object Get Permissions Available",
  1916. Status: false,
  1917. AdditionalInfo: err.Error(),
  1918. }
  1919. return nil, err
  1920. } else {
  1921. a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
  1922. Message: "Object Get Permissions Available",
  1923. Status: true,
  1924. }
  1925. }
  1926. r := bytes.NewReader(buf.Bytes())
  1927. gr, err := gzip.NewReader(r)
  1928. if err != nil {
  1929. return nil, err
  1930. }
  1931. csvReader := csv.NewReader(gr)
  1932. csvReader.Comma = '\t'
  1933. csvReader.FieldsPerRecord = fieldsPerRecord
  1934. dec, err := csvutil.NewDecoder(csvReader, header...)
  1935. if err != nil {
  1936. return nil, err
  1937. }
  1938. var foundVersion string
  1939. for {
  1940. spot := spotInfo{}
  1941. err := dec.Decode(&spot)
  1942. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1943. if err == io.EOF {
  1944. break
  1945. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1946. rec := dec.Record()
  1947. // the first two "Record()" will be the comment lines
  1948. // and they show up as len() == 1
  1949. // the first of which is "#Version"
  1950. // the second of which is "#Fields: "
  1951. if len(rec) != 1 {
  1952. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1953. continue
  1954. }
  1955. if len(foundVersion) == 0 {
  1956. spotFeedVersion := rec[0]
  1957. klog.V(4).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  1958. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1959. if matches != nil {
  1960. foundVersion = matches[1]
  1961. if foundVersion != supportedSpotFeedVersion {
  1962. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1963. break
  1964. }
  1965. }
  1966. continue
  1967. } else if strings.Index(rec[0], "#") == 0 {
  1968. continue
  1969. } else {
  1970. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  1971. continue
  1972. }
  1973. } else if err != nil {
  1974. klog.V(2).Infof("Error during spot info decode: %+v", err)
  1975. continue
  1976. }
  1977. log.DedupedInfof(5, "Found spot info for: %s", spot.InstanceID)
  1978. spots[spot.InstanceID] = &spot
  1979. }
  1980. gr.Close()
  1981. }
  1982. return spots, nil
  1983. }
  1984. func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  1985. /*
  1986. numReserved := len(a.ReservedInstances)
  1987. // Early return if no reserved instance data loaded
  1988. if numReserved == 0 {
  1989. klog.V(4).Infof("[Reserved] No Reserved Instances")
  1990. return
  1991. }
  1992. cfg, err := a.GetConfig()
  1993. defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
  1994. if err != nil {
  1995. klog.V(3).Infof("Could not parse default cpu price")
  1996. defaultCPU = 0.031611
  1997. }
  1998. defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
  1999. if err != nil {
  2000. klog.V(3).Infof("Could not parse default ram price")
  2001. defaultRAM = 0.004237
  2002. }
  2003. cpuToRAMRatio := defaultCPU / defaultRAM
  2004. now := time.Now()
  2005. instances := make(map[string][]*AWSReservedInstance)
  2006. for _, r := range a.ReservedInstances {
  2007. if now.Before(r.StartDate) || now.After(r.EndDate) {
  2008. klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
  2009. continue
  2010. }
  2011. _, ok := instances[r.Region]
  2012. if !ok {
  2013. instances[r.Region] = []*AWSReservedInstance{r}
  2014. } else {
  2015. instances[r.Region] = append(instances[r.Region], r)
  2016. }
  2017. }
  2018. awsNodes := make(map[string]*v1.Node)
  2019. currentNodes := a.Clientset.GetAllNodes()
  2020. // Create a node name -> node map
  2021. for _, awsNode := range currentNodes {
  2022. awsNodes[awsNode.GetName()] = awsNode
  2023. }
  2024. // go through all provider nodes using k8s nodes for region
  2025. for nodeName, node := range nodes {
  2026. // Reset reserved allocation to prevent double allocation
  2027. node.Reserved = nil
  2028. kNode, ok := awsNodes[nodeName]
  2029. if !ok {
  2030. klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
  2031. continue
  2032. }
  2033. nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
  2034. if !ok {
  2035. klog.V(1).Infof("[Reserved] Could not find node region")
  2036. continue
  2037. }
  2038. reservedInstances, ok := instances[nodeRegion]
  2039. if !ok {
  2040. klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
  2041. continue
  2042. }
  2043. // Determine the InstanceType of the node
  2044. instanceType, ok := kNode.Labels["beta.kubernetes.io/instance-type"]
  2045. if !ok {
  2046. continue
  2047. }
  2048. ramBytes, err := strconv.ParseFloat(node.RAMBytes, 64)
  2049. if err != nil {
  2050. continue
  2051. }
  2052. ramGB := ramBytes / 1024 / 1024 / 1024
  2053. cpu, err := strconv.ParseFloat(node.VCPU, 64)
  2054. if err != nil {
  2055. continue
  2056. }
  2057. ramMultiple := cpu*cpuToRAMRatio + ramGB
  2058. node.Reserved = &ReservedInstanceData{
  2059. ReservedCPU: 0,
  2060. ReservedRAM: 0,
  2061. }
  2062. for i, reservedInstance := range reservedInstances {
  2063. if reservedInstance.InstanceType == instanceType {
  2064. // Use < 0 to mark as ALL
  2065. node.Reserved.ReservedCPU = -1
  2066. node.Reserved.ReservedRAM = -1
  2067. // Set Costs based on CPU/RAM ratios
  2068. ramPrice := reservedInstance.PricePerHour / ramMultiple
  2069. node.Reserved.CPUCost = ramPrice * cpuToRAMRatio
  2070. node.Reserved.RAMCost = ramPrice
  2071. // Remove the reserve from the temporary slice to prevent
  2072. // being reallocated
  2073. instances[nodeRegion] = append(reservedInstances[:i], reservedInstances[i+1:]...)
  2074. break
  2075. }
  2076. }
  2077. }*/
  2078. }
  2079. type AWSReservedInstance struct {
  2080. Zone string
  2081. Region string
  2082. InstanceType string
  2083. InstanceCount int64
  2084. InstanceTenacy string
  2085. StartDate time.Time
  2086. EndDate time.Time
  2087. PricePerHour float64
  2088. }
  2089. func (ari *AWSReservedInstance) String() string {
  2090. return fmt.Sprintf("[Zone: %s, Region: %s, Type: %s, Count: %d, Tenacy: %s, Start: %+v, End: %+v, Price: %f]", ari.Zone, ari.Region, ari.InstanceType, ari.InstanceCount, ari.InstanceTenacy, ari.StartDate, ari.EndDate, ari.PricePerHour)
  2091. }
  2092. func isReservedInstanceHourlyPrice(rc *ec2.RecurringCharge) bool {
  2093. return rc != nil && rc.Frequency != nil && *rc.Frequency == "Hourly"
  2094. }
  2095. func getReservedInstancePrice(ri *ec2.ReservedInstances) (float64, error) {
  2096. var pricePerHour float64
  2097. if len(ri.RecurringCharges) > 0 {
  2098. for _, rc := range ri.RecurringCharges {
  2099. if isReservedInstanceHourlyPrice(rc) {
  2100. pricePerHour = *rc.Amount
  2101. break
  2102. }
  2103. }
  2104. }
  2105. // If we're still unable to resolve hourly price, try fixed -> hourly
  2106. if pricePerHour == 0 {
  2107. if ri.Duration != nil && ri.FixedPrice != nil {
  2108. var durHours float64
  2109. durSeconds := float64(*ri.Duration)
  2110. fixedPrice := float64(*ri.FixedPrice)
  2111. if durSeconds != 0 && fixedPrice != 0 {
  2112. durHours = durSeconds / 60 / 60
  2113. pricePerHour = fixedPrice / durHours
  2114. }
  2115. }
  2116. }
  2117. if pricePerHour == 0 {
  2118. return 0, fmt.Errorf("Failed to resolve an hourly price from FixedPrice or Recurring Costs")
  2119. }
  2120. return pricePerHour, nil
  2121. }
  2122. func getRegionReservedInstances(region string) ([]*AWSReservedInstance, error) {
  2123. c := &aws.Config{
  2124. Region: aws.String(region),
  2125. }
  2126. s := session.Must(session.NewSession(c))
  2127. svc := ec2.New(s)
  2128. response, err := svc.DescribeReservedInstances(&ec2.DescribeReservedInstancesInput{})
  2129. if err != nil {
  2130. return nil, err
  2131. }
  2132. var reservedInstances []*AWSReservedInstance
  2133. for _, ri := range response.ReservedInstances {
  2134. var zone string
  2135. if ri.AvailabilityZone != nil {
  2136. zone = *ri.AvailabilityZone
  2137. }
  2138. pricePerHour, err := getReservedInstancePrice(ri)
  2139. if err != nil {
  2140. klog.V(1).Infof("Error Resolving Price: %s", err.Error())
  2141. continue
  2142. }
  2143. reservedInstances = append(reservedInstances, &AWSReservedInstance{
  2144. Zone: zone,
  2145. Region: region,
  2146. InstanceType: *ri.InstanceType,
  2147. InstanceCount: *ri.InstanceCount,
  2148. InstanceTenacy: *ri.InstanceTenancy,
  2149. StartDate: *ri.Start,
  2150. EndDate: *ri.End,
  2151. PricePerHour: pricePerHour,
  2152. })
  2153. }
  2154. return reservedInstances, nil
  2155. }
  2156. func (a *AWS) getReservedInstances() ([]*AWSReservedInstance, error) {
  2157. err := a.configureAWSAuth()
  2158. if err != nil {
  2159. return nil, fmt.Errorf("Error Configuring aws auth: %s", err.Error())
  2160. }
  2161. var reservedInstances []*AWSReservedInstance
  2162. nodes := a.Clientset.GetAllNodes()
  2163. regionsSeen := make(map[string]bool)
  2164. for _, node := range nodes {
  2165. region, ok := node.Labels[v1.LabelZoneRegion]
  2166. if !ok {
  2167. continue
  2168. }
  2169. if regionsSeen[region] {
  2170. continue
  2171. }
  2172. ris, err := getRegionReservedInstances(region)
  2173. if err != nil {
  2174. klog.V(3).Infof("Error getting reserved instances: %s", err.Error())
  2175. continue
  2176. }
  2177. regionsSeen[region] = true
  2178. reservedInstances = append(reservedInstances, ris...)
  2179. }
  2180. return reservedInstances, nil
  2181. }
  2182. func (a *AWS) ServiceAccountStatus() *ServiceAccountStatus {
  2183. checks := []*ServiceAccountCheck{}
  2184. for _, v := range a.ServiceAccountChecks {
  2185. checks = append(checks, v)
  2186. }
  2187. return &ServiceAccountStatus{
  2188. Checks: checks,
  2189. }
  2190. }
  2191. func (aws *AWS) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  2192. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  2193. }
  2194. func (aws *AWS) ParseID(id string) string {
  2195. // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  2196. rx := regexp.MustCompile("aws://[^/]*/[^/]*/([^/]+)")
  2197. match := rx.FindStringSubmatch(id)
  2198. if len(match) < 2 {
  2199. if id != "" {
  2200. log.Infof("awsprovider.ParseID: failed to parse %s", id)
  2201. }
  2202. return id
  2203. }
  2204. return match[1]
  2205. }
  2206. func (aws *AWS) ParsePVID(id string) string {
  2207. rx := regexp.MustCompile("aws:/[^/]*/[^/]*/([^/]+)") // Capture "vol-0fc54c5e83b8d2b76" from "aws://us-east-2a/vol-0fc54c5e83b8d2b76"
  2208. match := rx.FindStringSubmatch(id)
  2209. if len(match) < 2 {
  2210. if id != "" {
  2211. log.Infof("awsprovider.ParseID: failed to parse %s", id)
  2212. }
  2213. return id
  2214. }
  2215. return match[1]
  2216. }