awsprovider.go 68 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "encoding/csv"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "os"
  12. "regexp"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "github.com/aws/aws-sdk-go/aws/endpoints"
  18. "k8s.io/klog"
  19. "github.com/kubecost/cost-model/pkg/clustercache"
  20. "github.com/kubecost/cost-model/pkg/env"
  21. "github.com/kubecost/cost-model/pkg/errors"
  22. "github.com/kubecost/cost-model/pkg/log"
  23. "github.com/kubecost/cost-model/pkg/util"
  24. "github.com/kubecost/cost-model/pkg/util/cloudutil"
  25. "github.com/kubecost/cost-model/pkg/util/fileutil"
  26. "github.com/kubecost/cost-model/pkg/util/json"
  27. "github.com/aws/aws-sdk-go/aws"
  28. "github.com/aws/aws-sdk-go/aws/awserr"
  29. "github.com/aws/aws-sdk-go/aws/credentials"
  30. "github.com/aws/aws-sdk-go/aws/credentials/stscreds"
  31. "github.com/aws/aws-sdk-go/aws/session"
  32. "github.com/aws/aws-sdk-go/service/athena"
  33. "github.com/aws/aws-sdk-go/service/ec2"
  34. "github.com/aws/aws-sdk-go/service/s3"
  35. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  36. awsV2 "github.com/aws/aws-sdk-go-v2/aws"
  37. "github.com/jszwec/csvutil"
  38. v1 "k8s.io/api/core/v1"
  39. )
  40. const supportedSpotFeedVersion = "1"
  41. const SpotInfoUpdateType = "spotinfo"
  42. const AthenaInfoUpdateType = "athenainfo"
  43. const PreemptibleType = "preemptible"
  44. const APIPricingSource = "Public API"
  45. const SpotPricingSource = "Spot Data Feed"
  46. const ReservedInstancePricingSource = "Savings Plan, Reserved Instance, and Out-Of-Cluster"
  47. func (aws *AWS) PricingSourceStatus() map[string]*PricingSource {
  48. sources := make(map[string]*PricingSource)
  49. sps := &PricingSource{
  50. Name: SpotPricingSource,
  51. }
  52. sps.Error = ""
  53. if aws.SpotPricingError != nil {
  54. sps.Error = aws.SpotPricingError.Error()
  55. }
  56. if sps.Error != "" {
  57. sps.Available = false
  58. } else if len(aws.SpotPricingByInstanceID) > 0 {
  59. sps.Available = true
  60. } else {
  61. sps.Error = "No spot instances detected"
  62. }
  63. sources[SpotPricingSource] = sps
  64. rps := &PricingSource{
  65. Name: ReservedInstancePricingSource,
  66. }
  67. rps.Error = ""
  68. if aws.RIPricingError != nil {
  69. rps.Error = aws.RIPricingError.Error()
  70. }
  71. if rps.Error != "" {
  72. rps.Available = false
  73. } else {
  74. rps.Available = true
  75. }
  76. sources[ReservedInstancePricingSource] = rps
  77. return sources
  78. }
  79. // How often spot data is refreshed
  80. const SpotRefreshDuration = 15 * time.Minute
  81. const defaultConfigPath = "/var/configs/"
  82. var awsRegions = []string{
  83. "us-east-2",
  84. "us-east-1",
  85. "us-west-1",
  86. "us-west-2",
  87. "ap-east-1",
  88. "ap-south-1",
  89. "ap-northeast-3",
  90. "ap-northeast-2",
  91. "ap-southeast-1",
  92. "ap-southeast-2",
  93. "ap-northeast-1",
  94. "ca-central-1",
  95. "cn-north-1",
  96. "cn-northwest-1",
  97. "eu-central-1",
  98. "eu-west-1",
  99. "eu-west-2",
  100. "eu-west-3",
  101. "eu-north-1",
  102. "me-south-1",
  103. "sa-east-1",
  104. "us-gov-east-1",
  105. "us-gov-west-1",
  106. }
  107. // AWS represents an Amazon Provider
  108. type AWS struct {
  109. Pricing map[string]*AWSProductTerms
  110. SpotPricingByInstanceID map[string]*spotInfo
  111. SpotPricingUpdatedAt *time.Time
  112. SpotRefreshRunning bool
  113. SpotPricingLock sync.RWMutex
  114. SpotPricingError error
  115. RIPricingByInstanceID map[string]*RIData
  116. RIPricingError error
  117. RIDataRunning bool
  118. RIDataLock sync.RWMutex
  119. SavingsPlanDataByInstanceID map[string]*SavingsPlanData
  120. SavingsPlanDataRunning bool
  121. SavingsPlanDataLock sync.RWMutex
  122. ValidPricingKeys map[string]bool
  123. Clientset clustercache.ClusterCache
  124. BaseCPUPrice string
  125. BaseRAMPrice string
  126. BaseGPUPrice string
  127. BaseSpotCPUPrice string
  128. BaseSpotRAMPrice string
  129. BaseSpotGPUPrice string
  130. SpotLabelName string
  131. SpotLabelValue string
  132. SpotDataRegion string
  133. SpotDataBucket string
  134. SpotDataPrefix string
  135. ProjectID string
  136. DownloadPricingDataLock sync.RWMutex
  137. Config *ProviderConfig
  138. ServiceAccountChecks map[string]*ServiceAccountCheck
  139. clusterManagementPrice float64
  140. clusterProvisioner string
  141. *CustomProvider
  142. }
  143. type AWSAccessKey struct {
  144. AccessKeyID string `json:"aws_access_key_id"`
  145. SecretAccessKey string `json:"aws_secret_access_key"`
  146. }
  147. // Retrieve returns a set of awsV2 credentials using the AWSAccessKey's key and secret.
  148. // This fullfils the awsV2.CredentialsProvider interface contract.
  149. func (accessKey AWSAccessKey) Retrieve(ctx context.Context) (awsV2.Credentials, error) {
  150. return awsV2.Credentials{
  151. AccessKeyID: accessKey.AccessKeyID,
  152. SecretAccessKey: accessKey.SecretAccessKey,
  153. }, nil
  154. }
  155. // AWSPricing maps a k8s node to an AWS Pricing "product"
  156. type AWSPricing struct {
  157. Products map[string]*AWSProduct `json:"products"`
  158. Terms AWSPricingTerms `json:"terms"`
  159. }
  160. // AWSProduct represents a purchased SKU
  161. type AWSProduct struct {
  162. Sku string `json:"sku"`
  163. Attributes AWSProductAttributes `json:"attributes"`
  164. }
  165. // AWSProductAttributes represents metadata about the product used to map to a node.
  166. type AWSProductAttributes struct {
  167. Location string `json:"location"`
  168. InstanceType string `json:"instanceType"`
  169. Memory string `json:"memory"`
  170. Storage string `json:"storage"`
  171. VCpu string `json:"vcpu"`
  172. UsageType string `json:"usagetype"`
  173. OperatingSystem string `json:"operatingSystem"`
  174. PreInstalledSw string `json:"preInstalledSw"`
  175. InstanceFamily string `json:"instanceFamily"`
  176. CapacityStatus string `json:"capacitystatus"`
  177. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  178. }
  179. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  180. type AWSPricingTerms struct {
  181. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  182. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  183. }
  184. // AWSOfferTerm is a sku extension used to pay for the node.
  185. type AWSOfferTerm struct {
  186. Sku string `json:"sku"`
  187. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  188. }
  189. func (ot *AWSOfferTerm) String() string {
  190. var strs []string
  191. for k, rc := range ot.PriceDimensions {
  192. strs = append(strs, fmt.Sprintf("%s:%s", k, rc.String()))
  193. }
  194. return fmt.Sprintf("%s:%s", ot.Sku, strings.Join(strs, ","))
  195. }
  196. // AWSRateCode encodes data about the price of a product
  197. type AWSRateCode struct {
  198. Unit string `json:"unit"`
  199. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  200. }
  201. func (rc *AWSRateCode) String() string {
  202. return fmt.Sprintf("{unit: %s, pricePerUnit: %v", rc.Unit, rc.PricePerUnit)
  203. }
  204. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  205. type AWSCurrencyCode struct {
  206. USD string `json:"USD,omitempty"`
  207. CNY string `json:"CNY,omitempty"`
  208. }
  209. // AWSProductTerms represents the full terms of the product
  210. type AWSProductTerms struct {
  211. Sku string `json:"sku"`
  212. OnDemand *AWSOfferTerm `json:"OnDemand"`
  213. Reserved *AWSOfferTerm `json:"Reserved"`
  214. Memory string `json:"memory"`
  215. Storage string `json:"storage"`
  216. VCpu string `json:"vcpu"`
  217. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  218. PV *PV `json:"pv"`
  219. }
  220. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  221. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  222. // OnDemandRateCode is appended to an node sku
  223. const OnDemandRateCode = ".JRTCKXETXF"
  224. const OnDemandRateCodeCn = ".99YE2YK9UR"
  225. // ReservedRateCode is appended to a node sku
  226. const ReservedRateCode = ".38NPMPTW36"
  227. // HourlyRateCode is appended to a node sku
  228. const HourlyRateCode = ".6YS6EN2CT7"
  229. const HourlyRateCodeCn = ".Q7UJUT2CE6"
  230. // volTypes are used to map between AWS UsageTypes and
  231. // EBS volume types, as they would appear in K8s storage class
  232. // name and the EC2 API.
  233. var volTypes = map[string]string{
  234. "EBS:VolumeUsage.gp2": "gp2",
  235. "EBS:VolumeUsage": "standard",
  236. "EBS:VolumeUsage.sc1": "sc1",
  237. "EBS:VolumeP-IOPS.piops": "io1",
  238. "EBS:VolumeUsage.st1": "st1",
  239. "EBS:VolumeUsage.piops": "io1",
  240. "gp2": "EBS:VolumeUsage.gp2",
  241. "standard": "EBS:VolumeUsage",
  242. "sc1": "EBS:VolumeUsage.sc1",
  243. "io1": "EBS:VolumeUsage.piops",
  244. "st1": "EBS:VolumeUsage.st1",
  245. }
  246. // locationToRegion maps AWS region names (As they come from Billing)
  247. // to actual region identifiers
  248. var locationToRegion = map[string]string{
  249. "US East (Ohio)": "us-east-2",
  250. "US East (N. Virginia)": "us-east-1",
  251. "US West (N. California)": "us-west-1",
  252. "US West (Oregon)": "us-west-2",
  253. "Asia Pacific (Hong Kong)": "ap-east-1",
  254. "Asia Pacific (Mumbai)": "ap-south-1",
  255. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  256. "Asia Pacific (Seoul)": "ap-northeast-2",
  257. "Asia Pacific (Singapore)": "ap-southeast-1",
  258. "Asia Pacific (Sydney)": "ap-southeast-2",
  259. "Asia Pacific (Tokyo)": "ap-northeast-1",
  260. "Canada (Central)": "ca-central-1",
  261. "China (Beijing)": "cn-north-1",
  262. "China (Ningxia)": "cn-northwest-1",
  263. "EU (Frankfurt)": "eu-central-1",
  264. "EU (Ireland)": "eu-west-1",
  265. "EU (London)": "eu-west-2",
  266. "EU (Paris)": "eu-west-3",
  267. "EU (Stockholm)": "eu-north-1",
  268. "South America (Sao Paulo)": "sa-east-1",
  269. "AWS GovCloud (US-East)": "us-gov-east-1",
  270. "AWS GovCloud (US-West)": "us-gov-west-1",
  271. }
  272. var regionToBillingRegionCode = map[string]string{
  273. "us-east-2": "USE2",
  274. "us-east-1": "",
  275. "us-west-1": "USW1",
  276. "us-west-2": "USW2",
  277. "ap-east-1": "APE1",
  278. "ap-south-1": "APS3",
  279. "ap-northeast-3": "APN3",
  280. "ap-northeast-2": "APN2",
  281. "ap-southeast-1": "APS1",
  282. "ap-southeast-2": "APS2",
  283. "ap-northeast-1": "APN1",
  284. "ca-central-1": "CAN1",
  285. "cn-north-1": "",
  286. "cn-northwest-1": "",
  287. "eu-central-1": "EUC1",
  288. "eu-west-1": "EU",
  289. "eu-west-2": "EUW2",
  290. "eu-west-3": "EUW3",
  291. "eu-north-1": "EUN1",
  292. "sa-east-1": "SAE1",
  293. "us-gov-east-1": "UGE1",
  294. "us-gov-west-1": "UGW1",
  295. }
  296. var loadedAWSSecret bool = false
  297. var awsSecret *AWSAccessKey = nil
  298. func (aws *AWS) GetLocalStorageQuery(window, offset time.Duration, rate bool, used bool) string {
  299. return ""
  300. }
  301. // KubeAttrConversion maps the k8s labels for region to an aws region
  302. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  303. operatingSystem = strings.ToLower(operatingSystem)
  304. region := locationToRegion[location]
  305. return region + "," + instanceType + "," + operatingSystem
  306. }
  307. type AwsSpotFeedInfo struct {
  308. BucketName string `json:"bucketName"`
  309. Prefix string `json:"prefix"`
  310. Region string `json:"region"`
  311. AccountID string `json:"projectID"`
  312. ServiceKeyName string `json:"serviceKeyName"`
  313. ServiceKeySecret string `json:"serviceKeySecret"`
  314. SpotLabel string `json:"spotLabel"`
  315. SpotLabelValue string `json:"spotLabelValue"`
  316. }
  317. type AwsAthenaInfo struct {
  318. AthenaBucketName string `json:"athenaBucketName"`
  319. AthenaRegion string `json:"athenaRegion"`
  320. AthenaDatabase string `json:"athenaDatabase"`
  321. AthenaTable string `json:"athenaTable"`
  322. ServiceKeyName string `json:"serviceKeyName"`
  323. ServiceKeySecret string `json:"serviceKeySecret"`
  324. AccountID string `json:"projectID"`
  325. MasterPayerARN string `json:"masterPayerARN"`
  326. }
  327. func (aws *AWS) GetManagementPlatform() (string, error) {
  328. nodes := aws.Clientset.GetAllNodes()
  329. if len(nodes) > 0 {
  330. n := nodes[0]
  331. version := n.Status.NodeInfo.KubeletVersion
  332. if strings.Contains(version, "eks") {
  333. return "eks", nil
  334. }
  335. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  336. return "kops", nil
  337. }
  338. }
  339. return "", nil
  340. }
  341. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  342. c, err := aws.Config.GetCustomPricingData()
  343. if err != nil {
  344. return nil, err
  345. }
  346. if c.Discount == "" {
  347. c.Discount = "0%"
  348. }
  349. if c.NegotiatedDiscount == "" {
  350. c.NegotiatedDiscount = "0%"
  351. }
  352. if c.ShareTenancyCosts == "" {
  353. c.ShareTenancyCosts = defaultShareTenancyCost
  354. }
  355. return c, nil
  356. }
  357. func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
  358. return aws.Config.UpdateFromMap(a)
  359. }
  360. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  361. return aws.Config.Update(func(c *CustomPricing) error {
  362. if updateType == SpotInfoUpdateType {
  363. a := AwsSpotFeedInfo{}
  364. err := json.NewDecoder(r).Decode(&a)
  365. if err != nil {
  366. return err
  367. }
  368. c.ServiceKeyName = a.ServiceKeyName
  369. if a.ServiceKeySecret != "" {
  370. c.ServiceKeySecret = a.ServiceKeySecret
  371. }
  372. c.SpotDataPrefix = a.Prefix
  373. c.SpotDataBucket = a.BucketName
  374. c.ProjectID = a.AccountID
  375. c.SpotDataRegion = a.Region
  376. c.SpotLabel = a.SpotLabel
  377. c.SpotLabelValue = a.SpotLabelValue
  378. } else if updateType == AthenaInfoUpdateType {
  379. a := AwsAthenaInfo{}
  380. err := json.NewDecoder(r).Decode(&a)
  381. if err != nil {
  382. return err
  383. }
  384. c.AthenaBucketName = a.AthenaBucketName
  385. c.AthenaRegion = a.AthenaRegion
  386. c.AthenaDatabase = a.AthenaDatabase
  387. c.AthenaTable = a.AthenaTable
  388. c.ServiceKeyName = a.ServiceKeyName
  389. if a.ServiceKeySecret != "" {
  390. c.ServiceKeySecret = a.ServiceKeySecret
  391. }
  392. if a.MasterPayerARN != "" {
  393. c.MasterPayerARN = a.MasterPayerARN
  394. }
  395. c.AthenaProjectID = a.AccountID
  396. } else {
  397. a := make(map[string]interface{})
  398. err := json.NewDecoder(r).Decode(&a)
  399. if err != nil {
  400. return err
  401. }
  402. for k, v := range a {
  403. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  404. vstr, ok := v.(string)
  405. if ok {
  406. err := SetCustomPricingField(c, kUpper, vstr)
  407. if err != nil {
  408. return err
  409. }
  410. } else {
  411. return fmt.Errorf("type error while updating config for %s", kUpper)
  412. }
  413. }
  414. }
  415. if env.IsRemoteEnabled() {
  416. err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
  417. if err != nil {
  418. return err
  419. }
  420. }
  421. return nil
  422. })
  423. }
  424. type awsKey struct {
  425. SpotLabelName string
  426. SpotLabelValue string
  427. Labels map[string]string
  428. ProviderID string
  429. }
  430. func (k *awsKey) GPUType() string {
  431. return ""
  432. }
  433. func (k *awsKey) ID() string {
  434. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  435. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  436. if matchNum == 2 {
  437. return group
  438. }
  439. }
  440. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  441. return ""
  442. }
  443. func (k *awsKey) Features() string {
  444. instanceType, _ := util.GetInstanceType(k.Labels)
  445. operatingSystem, _ := util.GetOperatingSystem(k.Labels)
  446. region, _ := util.GetRegion(k.Labels)
  447. key := region + "," + instanceType + "," + operatingSystem
  448. usageType := PreemptibleType
  449. spotKey := key + "," + usageType
  450. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  451. return spotKey
  452. }
  453. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  454. return spotKey
  455. }
  456. return key
  457. }
  458. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  459. pricing, ok := aws.Pricing[pvk.Features()]
  460. if !ok {
  461. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  462. return &PV{}, nil
  463. }
  464. return pricing.PV, nil
  465. }
  466. type awsPVKey struct {
  467. Labels map[string]string
  468. StorageClassParameters map[string]string
  469. StorageClassName string
  470. Name string
  471. DefaultRegion string
  472. ProviderID string
  473. }
  474. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  475. providerID := ""
  476. if pv.Spec.AWSElasticBlockStore != nil {
  477. providerID = pv.Spec.AWSElasticBlockStore.VolumeID
  478. } else if pv.Spec.CSI != nil {
  479. providerID = pv.Spec.CSI.VolumeHandle
  480. }
  481. return &awsPVKey{
  482. Labels: pv.Labels,
  483. StorageClassName: pv.Spec.StorageClassName,
  484. StorageClassParameters: parameters,
  485. Name: pv.Name,
  486. DefaultRegion: defaultRegion,
  487. ProviderID: providerID,
  488. }
  489. }
  490. func (key *awsPVKey) ID() string {
  491. return key.ProviderID
  492. }
  493. func (key *awsPVKey) GetStorageClass() string {
  494. return key.StorageClassName
  495. }
  496. func (key *awsPVKey) Features() string {
  497. storageClass := key.StorageClassParameters["type"]
  498. if storageClass == "standard" {
  499. storageClass = "gp2"
  500. }
  501. // Storage class names are generally EBS volume types (gp2)
  502. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  503. // Converts between the 2
  504. region, ok := util.GetRegion(key.Labels)
  505. if !ok {
  506. region = key.DefaultRegion
  507. }
  508. class, ok := volTypes[storageClass]
  509. if !ok {
  510. klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  511. }
  512. return region + "," + class
  513. }
  514. // GetKey maps node labels to information needed to retrieve pricing data
  515. func (aws *AWS) GetKey(labels map[string]string, n *v1.Node) Key {
  516. return &awsKey{
  517. SpotLabelName: aws.SpotLabelName,
  518. SpotLabelValue: aws.SpotLabelValue,
  519. Labels: labels,
  520. ProviderID: labels["providerID"],
  521. }
  522. }
  523. func (aws *AWS) isPreemptible(key string) bool {
  524. s := strings.Split(key, ",")
  525. if len(s) == 4 && s[3] == PreemptibleType {
  526. return true
  527. }
  528. return false
  529. }
  530. func (aws *AWS) ClusterManagementPricing() (string, float64, error) {
  531. return aws.clusterProvisioner, aws.clusterManagementPrice, nil
  532. }
  533. // Use the pricing data from the current region. Fall back to using all region data if needed.
  534. func (aws *AWS) getRegionPricing(nodeList []*v1.Node) (*http.Response, string, error) {
  535. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/"
  536. region := ""
  537. multiregion := false
  538. for _, n := range nodeList {
  539. labels := n.GetLabels()
  540. currentNodeRegion := ""
  541. if r, ok := util.GetRegion(labels); ok {
  542. currentNodeRegion = r
  543. // Switch to Chinese endpoint for regions with the Chinese prefix
  544. if strings.HasPrefix(currentNodeRegion, "cn-") {
  545. pricingURL = "https://pricing.cn-north-1.amazonaws.com.cn/offers/v1.0/cn/AmazonEC2/current/"
  546. }
  547. } else {
  548. multiregion = true // We weren't able to detect the node's region, so pull all data.
  549. break
  550. }
  551. if region == "" { // We haven't set a region yet
  552. region = currentNodeRegion
  553. } else if region != "" && currentNodeRegion != region { // If two nodes have different regions here, we'll need to fetch all pricing data.
  554. multiregion = true
  555. break
  556. }
  557. }
  558. // Chinese multiregion endpoint only contains data for Chinese regions and Chinese regions are excluded from other endpoint
  559. if region != "" && !multiregion {
  560. pricingURL += region + "/"
  561. }
  562. pricingURL += "index.json"
  563. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  564. resp, err := http.Get(pricingURL)
  565. if err != nil {
  566. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  567. return nil, pricingURL, err
  568. }
  569. return resp, pricingURL, err
  570. }
  571. // DownloadPricingData fetches data from the AWS Pricing API
  572. func (aws *AWS) DownloadPricingData() error {
  573. aws.DownloadPricingDataLock.Lock()
  574. defer aws.DownloadPricingDataLock.Unlock()
  575. if aws.ServiceAccountChecks == nil {
  576. aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  577. }
  578. c, err := aws.Config.GetCustomPricingData()
  579. if err != nil {
  580. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  581. }
  582. aws.BaseCPUPrice = c.CPU
  583. aws.BaseRAMPrice = c.RAM
  584. aws.BaseGPUPrice = c.GPU
  585. aws.BaseSpotCPUPrice = c.SpotCPU
  586. aws.BaseSpotRAMPrice = c.SpotRAM
  587. aws.BaseSpotGPUPrice = c.SpotGPU
  588. aws.SpotLabelName = c.SpotLabel
  589. aws.SpotLabelValue = c.SpotLabelValue
  590. aws.SpotDataBucket = c.SpotDataBucket
  591. aws.SpotDataPrefix = c.SpotDataPrefix
  592. aws.ProjectID = c.ProjectID
  593. aws.SpotDataRegion = c.SpotDataRegion
  594. aws.ConfigureAuthWith(c) // load aws authentication from configuration or secret
  595. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  596. klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  597. }
  598. nodeList := aws.Clientset.GetAllNodes()
  599. inputkeys := make(map[string]bool)
  600. for _, n := range nodeList {
  601. if _, ok := n.Labels["eks.amazonaws.com/nodegroup"]; ok {
  602. aws.clusterManagementPrice = 0.10
  603. aws.clusterProvisioner = "EKS"
  604. } else if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  605. aws.clusterProvisioner = "KOPS"
  606. }
  607. labels := n.GetObjectMeta().GetLabels()
  608. key := aws.GetKey(labels, n)
  609. inputkeys[key.Features()] = true
  610. }
  611. pvList := aws.Clientset.GetAllPersistentVolumes()
  612. storageClasses := aws.Clientset.GetAllStorageClasses()
  613. storageClassMap := make(map[string]map[string]string)
  614. for _, storageClass := range storageClasses {
  615. params := storageClass.Parameters
  616. storageClassMap[storageClass.ObjectMeta.Name] = params
  617. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  618. storageClassMap["default"] = params
  619. storageClassMap[""] = params
  620. }
  621. }
  622. pvkeys := make(map[string]PVKey)
  623. for _, pv := range pvList {
  624. params, ok := storageClassMap[pv.Spec.StorageClassName]
  625. if !ok {
  626. klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  627. continue
  628. }
  629. key := aws.GetPVKey(pv, params, "")
  630. pvkeys[key.Features()] = key
  631. }
  632. // RIDataRunning establishes the existance of the goroutine. Since it's possible we
  633. // run multiple downloads, we don't want to create multiple go routines if one already exists
  634. if !aws.RIDataRunning && c.AthenaBucketName != "" {
  635. err = aws.GetReservationDataFromAthena() // Block until one run has completed.
  636. if err != nil {
  637. klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
  638. } else { // If we make one successful run, check on new reservation data every hour
  639. go func() {
  640. defer errors.HandlePanic()
  641. aws.RIDataRunning = true
  642. for {
  643. klog.Infof("Reserved Instance watcher running... next update in 1h")
  644. time.Sleep(time.Hour)
  645. err := aws.GetReservationDataFromAthena()
  646. if err != nil {
  647. klog.Infof("Error updating RI data: %s", err.Error())
  648. }
  649. }
  650. }()
  651. }
  652. }
  653. if !aws.SavingsPlanDataRunning && c.AthenaBucketName != "" {
  654. err = aws.GetSavingsPlanDataFromAthena()
  655. if err != nil {
  656. klog.V(1).Infof("Failed to lookup savings plan data: %s", err.Error())
  657. } else {
  658. go func() {
  659. defer errors.HandlePanic()
  660. aws.SavingsPlanDataRunning = true
  661. for {
  662. klog.Infof("Savings Plan watcher running... next update in 1h")
  663. time.Sleep(time.Hour)
  664. err := aws.GetSavingsPlanDataFromAthena()
  665. if err != nil {
  666. klog.Infof("Error updating Savings Plan data: %s", err.Error())
  667. }
  668. }
  669. }()
  670. }
  671. }
  672. aws.Pricing = make(map[string]*AWSProductTerms)
  673. aws.ValidPricingKeys = make(map[string]bool)
  674. skusToKeys := make(map[string]string)
  675. resp, pricingURL, err := aws.getRegionPricing(nodeList)
  676. if err != nil {
  677. return err
  678. }
  679. dec := json.NewDecoder(resp.Body)
  680. for {
  681. t, err := dec.Token()
  682. if err == io.EOF {
  683. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  684. break
  685. } else if err != nil {
  686. klog.V(2).Infof("error parsing response json %v", resp.Body)
  687. break
  688. }
  689. if t == "products" {
  690. _, err := dec.Token() // this should parse the opening "{""
  691. if err != nil {
  692. return err
  693. }
  694. for dec.More() {
  695. _, err := dec.Token() // the sku token
  696. if err != nil {
  697. return err
  698. }
  699. product := &AWSProduct{}
  700. err = dec.Decode(&product)
  701. if err != nil {
  702. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  703. break
  704. }
  705. if product.Attributes.PreInstalledSw == "NA" &&
  706. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) &&
  707. product.Attributes.CapacityStatus == "Used" {
  708. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  709. spotKey := key + ",preemptible"
  710. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  711. productTerms := &AWSProductTerms{
  712. Sku: product.Sku,
  713. Memory: product.Attributes.Memory,
  714. Storage: product.Attributes.Storage,
  715. VCpu: product.Attributes.VCpu,
  716. GPU: product.Attributes.GPU,
  717. }
  718. aws.Pricing[key] = productTerms
  719. aws.Pricing[spotKey] = productTerms
  720. skusToKeys[product.Sku] = key
  721. }
  722. aws.ValidPricingKeys[key] = true
  723. aws.ValidPricingKeys[spotKey] = true
  724. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  725. // UsageTypes may be prefixed with a region code - we're removing this when using
  726. // volTypes to keep lookups generic
  727. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  728. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  729. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  730. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  731. spotKey := key + ",preemptible"
  732. pv := &PV{
  733. Class: volTypes[usageTypeNoRegion],
  734. Region: locationToRegion[product.Attributes.Location],
  735. }
  736. productTerms := &AWSProductTerms{
  737. Sku: product.Sku,
  738. PV: pv,
  739. }
  740. aws.Pricing[key] = productTerms
  741. aws.Pricing[spotKey] = productTerms
  742. skusToKeys[product.Sku] = key
  743. aws.ValidPricingKeys[key] = true
  744. aws.ValidPricingKeys[spotKey] = true
  745. }
  746. }
  747. }
  748. if t == "terms" {
  749. _, err := dec.Token() // this should parse the opening "{""
  750. if err != nil {
  751. return err
  752. }
  753. termType, err := dec.Token()
  754. if err != nil {
  755. return err
  756. }
  757. if termType == "OnDemand" {
  758. _, err := dec.Token()
  759. if err != nil { // again, should parse an opening "{"
  760. return err
  761. }
  762. for dec.More() {
  763. sku, err := dec.Token()
  764. if err != nil {
  765. return err
  766. }
  767. _, err = dec.Token() // another opening "{"
  768. if err != nil {
  769. return err
  770. }
  771. skuOnDemand, err := dec.Token()
  772. if err != nil {
  773. return err
  774. }
  775. offerTerm := &AWSOfferTerm{}
  776. err = dec.Decode(&offerTerm)
  777. if err != nil {
  778. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  779. }
  780. key, ok := skusToKeys[sku.(string)]
  781. spotKey := key + ",preemptible"
  782. if ok {
  783. aws.Pricing[key].OnDemand = offerTerm
  784. aws.Pricing[spotKey].OnDemand = offerTerm
  785. var cost string
  786. if sku.(string)+OnDemandRateCode == skuOnDemand {
  787. cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  788. } else if sku.(string)+OnDemandRateCodeCn == skuOnDemand {
  789. cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCodeCn+HourlyRateCodeCn].PricePerUnit.CNY
  790. }
  791. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  792. // If the specific UsageType is the per IO cost used on io1 volumes
  793. // we need to add the per IO cost to the io1 PV cost
  794. // Add the per IO cost to the PV object for the io1 volume type
  795. aws.Pricing[key].PV.CostPerIO = cost
  796. } else if strings.Contains(key, "EBS:Volume") {
  797. // If volume, we need to get hourly cost and add it to the PV object
  798. costFloat, _ := strconv.ParseFloat(cost, 64)
  799. hourlyPrice := costFloat / 730
  800. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  801. }
  802. }
  803. _, err = dec.Token()
  804. if err != nil {
  805. return err
  806. }
  807. }
  808. _, err = dec.Token()
  809. if err != nil {
  810. return err
  811. }
  812. }
  813. }
  814. }
  815. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  816. // Always run spot pricing refresh when performing download
  817. aws.refreshSpotPricing(true)
  818. // Only start a single refresh goroutine
  819. if !aws.SpotRefreshRunning {
  820. aws.SpotRefreshRunning = true
  821. go func() {
  822. defer errors.HandlePanic()
  823. for {
  824. klog.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
  825. time.Sleep(SpotRefreshDuration)
  826. // Reoccurring refresh checks update times
  827. aws.refreshSpotPricing(false)
  828. }
  829. }()
  830. }
  831. return nil
  832. }
  833. func (aws *AWS) refreshSpotPricing(force bool) {
  834. aws.SpotPricingLock.Lock()
  835. defer aws.SpotPricingLock.Unlock()
  836. now := time.Now().UTC()
  837. updateTime := now.Add(-SpotRefreshDuration)
  838. // Return if there was an update time set and an hour hasn't elapsed
  839. if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
  840. return
  841. }
  842. sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion)
  843. if err != nil {
  844. klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
  845. aws.SpotPricingError = err
  846. return
  847. }
  848. aws.SpotPricingError = nil
  849. // update time last updated
  850. aws.SpotPricingUpdatedAt = &now
  851. aws.SpotPricingByInstanceID = sp
  852. }
  853. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  854. func (aws *AWS) NetworkPricing() (*Network, error) {
  855. cpricing, err := aws.Config.GetCustomPricingData()
  856. if err != nil {
  857. return nil, err
  858. }
  859. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  860. if err != nil {
  861. return nil, err
  862. }
  863. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  864. if err != nil {
  865. return nil, err
  866. }
  867. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  868. if err != nil {
  869. return nil, err
  870. }
  871. return &Network{
  872. ZoneNetworkEgressCost: znec,
  873. RegionNetworkEgressCost: rnec,
  874. InternetNetworkEgressCost: inec,
  875. }, nil
  876. }
  877. func (aws *AWS) LoadBalancerPricing() (*LoadBalancer, error) {
  878. fffrc := 0.025
  879. afrc := 0.010
  880. lbidc := 0.008
  881. numForwardingRules := 1.0
  882. dataIngressGB := 0.0
  883. var totalCost float64
  884. if numForwardingRules < 5 {
  885. totalCost = fffrc*numForwardingRules + lbidc*dataIngressGB
  886. } else {
  887. totalCost = fffrc*5 + afrc*(numForwardingRules-5) + lbidc*dataIngressGB
  888. }
  889. return &LoadBalancer{
  890. Cost: totalCost,
  891. }, nil
  892. }
  893. // AllNodePricing returns all the billing data fetched.
  894. func (aws *AWS) AllNodePricing() (interface{}, error) {
  895. aws.DownloadPricingDataLock.RLock()
  896. defer aws.DownloadPricingDataLock.RUnlock()
  897. return aws.Pricing, nil
  898. }
  899. func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
  900. aws.SpotPricingLock.RLock()
  901. defer aws.SpotPricingLock.RUnlock()
  902. info, ok := aws.SpotPricingByInstanceID[instanceID]
  903. return info, ok
  904. }
  905. func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
  906. aws.RIDataLock.RLock()
  907. defer aws.RIDataLock.RUnlock()
  908. data, ok := aws.RIPricingByInstanceID[instanceID]
  909. return data, ok
  910. }
  911. func (aws *AWS) savingsPlanPricing(instanceID string) (*SavingsPlanData, bool) {
  912. aws.SavingsPlanDataLock.RLock()
  913. defer aws.SavingsPlanDataLock.RUnlock()
  914. data, ok := aws.SavingsPlanDataByInstanceID[instanceID]
  915. return data, ok
  916. }
  917. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  918. key := k.Features()
  919. if spotInfo, ok := aws.spotPricing(k.ID()); ok {
  920. var spotcost string
  921. log.DedupedInfof(5, "Looking up spot data from feed for node %s", k.ID())
  922. arr := strings.Split(spotInfo.Charge, " ")
  923. if len(arr) == 2 {
  924. spotcost = arr[0]
  925. } else {
  926. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  927. }
  928. return &Node{
  929. Cost: spotcost,
  930. VCPU: terms.VCpu,
  931. RAM: terms.Memory,
  932. GPU: terms.GPU,
  933. Storage: terms.Storage,
  934. BaseCPUPrice: aws.BaseCPUPrice,
  935. BaseRAMPrice: aws.BaseRAMPrice,
  936. BaseGPUPrice: aws.BaseGPUPrice,
  937. UsageType: PreemptibleType,
  938. }, nil
  939. } else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
  940. log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
  941. return &Node{
  942. VCPU: terms.VCpu,
  943. VCPUCost: aws.BaseSpotCPUPrice,
  944. RAM: terms.Memory,
  945. GPU: terms.GPU,
  946. Storage: terms.Storage,
  947. BaseCPUPrice: aws.BaseCPUPrice,
  948. BaseRAMPrice: aws.BaseRAMPrice,
  949. BaseGPUPrice: aws.BaseGPUPrice,
  950. UsageType: PreemptibleType,
  951. }, nil
  952. } else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
  953. strCost := fmt.Sprintf("%f", sp.EffectiveCost)
  954. return &Node{
  955. Cost: strCost,
  956. VCPU: terms.VCpu,
  957. RAM: terms.Memory,
  958. GPU: terms.GPU,
  959. Storage: terms.Storage,
  960. BaseCPUPrice: aws.BaseCPUPrice,
  961. BaseRAMPrice: aws.BaseRAMPrice,
  962. BaseGPUPrice: aws.BaseGPUPrice,
  963. UsageType: usageType,
  964. }, nil
  965. } else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
  966. strCost := fmt.Sprintf("%f", ri.EffectiveCost)
  967. return &Node{
  968. Cost: strCost,
  969. VCPU: terms.VCpu,
  970. RAM: terms.Memory,
  971. GPU: terms.GPU,
  972. Storage: terms.Storage,
  973. BaseCPUPrice: aws.BaseCPUPrice,
  974. BaseRAMPrice: aws.BaseRAMPrice,
  975. BaseGPUPrice: aws.BaseGPUPrice,
  976. UsageType: usageType,
  977. }, nil
  978. }
  979. var cost string
  980. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  981. if ok {
  982. cost = c.PricePerUnit.USD
  983. } else {
  984. // Check for Chinese pricing before throwing error
  985. c, ok = terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCodeCn+HourlyRateCodeCn]
  986. if ok {
  987. cost = c.PricePerUnit.CNY
  988. } else {
  989. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  990. }
  991. }
  992. return &Node{
  993. Cost: cost,
  994. VCPU: terms.VCpu,
  995. RAM: terms.Memory,
  996. GPU: terms.GPU,
  997. Storage: terms.Storage,
  998. BaseCPUPrice: aws.BaseCPUPrice,
  999. BaseRAMPrice: aws.BaseRAMPrice,
  1000. BaseGPUPrice: aws.BaseGPUPrice,
  1001. UsageType: usageType,
  1002. }, nil
  1003. }
  1004. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  1005. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  1006. aws.DownloadPricingDataLock.RLock()
  1007. defer aws.DownloadPricingDataLock.RUnlock()
  1008. key := k.Features()
  1009. usageType := "ondemand"
  1010. if aws.isPreemptible(key) {
  1011. usageType = PreemptibleType
  1012. }
  1013. terms, ok := aws.Pricing[key]
  1014. if ok {
  1015. return aws.createNode(terms, usageType, k)
  1016. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  1017. aws.DownloadPricingDataLock.RUnlock()
  1018. err := aws.DownloadPricingData()
  1019. aws.DownloadPricingDataLock.RLock()
  1020. if err != nil {
  1021. return &Node{
  1022. Cost: aws.BaseCPUPrice,
  1023. BaseCPUPrice: aws.BaseCPUPrice,
  1024. BaseRAMPrice: aws.BaseRAMPrice,
  1025. BaseGPUPrice: aws.BaseGPUPrice,
  1026. UsageType: usageType,
  1027. UsesBaseCPUPrice: true,
  1028. }, err
  1029. }
  1030. terms, termsOk := aws.Pricing[key]
  1031. if !termsOk {
  1032. return &Node{
  1033. Cost: aws.BaseCPUPrice,
  1034. BaseCPUPrice: aws.BaseCPUPrice,
  1035. BaseRAMPrice: aws.BaseRAMPrice,
  1036. BaseGPUPrice: aws.BaseGPUPrice,
  1037. UsageType: usageType,
  1038. UsesBaseCPUPrice: true,
  1039. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  1040. }
  1041. return aws.createNode(terms, usageType, k)
  1042. } else { // Fall back to base pricing if we can't find the key. Base pricing is handled at the costmodel level.
  1043. return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
  1044. }
  1045. }
  1046. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  1047. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  1048. defaultClusterName := "AWS Cluster #1"
  1049. c, err := awsProvider.GetConfig()
  1050. if err != nil {
  1051. return nil, err
  1052. }
  1053. remoteEnabled := env.IsRemoteEnabled()
  1054. if c.ClusterName != "" {
  1055. m := make(map[string]string)
  1056. m["name"] = c.ClusterName
  1057. m["provider"] = "AWS"
  1058. m["id"] = env.GetClusterID()
  1059. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  1060. m["provisioner"] = awsProvider.clusterProvisioner
  1061. return m, nil
  1062. }
  1063. makeStructure := func(clusterName string) (map[string]string, error) {
  1064. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  1065. m := make(map[string]string)
  1066. m["name"] = clusterName
  1067. m["provider"] = "AWS"
  1068. m["id"] = env.GetClusterID()
  1069. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  1070. return m, nil
  1071. }
  1072. maybeClusterId := env.GetAWSClusterID()
  1073. if len(maybeClusterId) != 0 {
  1074. return makeStructure(maybeClusterId)
  1075. }
  1076. // TODO: This should be cached, it can take a long time to hit the API
  1077. //provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  1078. //clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  1079. //klog.Infof("nodelist get here %s", time.Now())
  1080. //nodeList := awsProvider.Clientset.GetAllNodes()
  1081. //klog.Infof("nodelist done here %s", time.Now())
  1082. /*for _, n := range nodeList {
  1083. region := ""
  1084. instanceId := ""
  1085. providerId := n.Spec.ProviderID
  1086. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  1087. if matchNum == 1 {
  1088. region = group
  1089. } else if matchNum == 2 {
  1090. instanceId = group
  1091. }
  1092. }
  1093. if len(instanceId) == 0 {
  1094. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  1095. continue
  1096. }
  1097. c := &aws.Config{
  1098. Region: aws.String(region),
  1099. }
  1100. s := session.Must(session.NewSession(c))
  1101. ec2Svc := ec2.New(s)
  1102. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  1103. InstanceIds: []*string{
  1104. aws.String(instanceId),
  1105. },
  1106. })
  1107. if diErr != nil {
  1108. klog.Infof("Error describing instances: %s", diErr)
  1109. continue
  1110. }
  1111. if len(di.Reservations) != 1 {
  1112. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  1113. continue
  1114. }
  1115. res := di.Reservations[0]
  1116. if len(res.Instances) != 1 {
  1117. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  1118. continue
  1119. }
  1120. inst := res.Instances[0]
  1121. for _, tag := range inst.Tags {
  1122. tagKey := *tag.Key
  1123. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  1124. if matchNum != 1 {
  1125. continue
  1126. }
  1127. return makeStructure(group)
  1128. }
  1129. }
  1130. }*/
  1131. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", env.AWSClusterIDEnvVar)
  1132. return makeStructure(defaultClusterName)
  1133. }
  1134. // updates the authentication to the latest values (via config or secret)
  1135. func (aws *AWS) ConfigureAuth() error {
  1136. c, err := aws.Config.GetCustomPricingData()
  1137. if err != nil {
  1138. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  1139. }
  1140. return aws.ConfigureAuthWith(c)
  1141. }
  1142. // updates the authentication to the latest values (via config or secret)
  1143. func (aws *AWS) ConfigureAuthWith(config *CustomPricing) error {
  1144. accessKeyID, accessKeySecret := aws.getAWSAuth(false, config)
  1145. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1146. err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
  1147. if err != nil {
  1148. return err
  1149. }
  1150. err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
  1151. if err != nil {
  1152. return err
  1153. }
  1154. }
  1155. return nil
  1156. }
  1157. // Gets the aws key id and secret
  1158. func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
  1159. if aws.ServiceAccountChecks == nil { // safety in case checks don't exist
  1160. aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  1161. }
  1162. // 1. Check config values first (set from frontend UI)
  1163. if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
  1164. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1165. Message: "AWS ServiceKey exists",
  1166. Status: true,
  1167. }
  1168. return cp.ServiceKeyName, cp.ServiceKeySecret
  1169. }
  1170. // 2. Check for secret
  1171. s, _ := aws.loadAWSAuthSecret(forceReload)
  1172. if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
  1173. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1174. Message: "AWS ServiceKey exists",
  1175. Status: true,
  1176. }
  1177. return s.AccessKeyID, s.SecretAccessKey
  1178. }
  1179. // 3. Fall back to env vars
  1180. if env.GetAWSAccessKeyID() == "" || env.GetAWSAccessKeyID() == "" {
  1181. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1182. Message: "AWS ServiceKey exists",
  1183. Status: false,
  1184. }
  1185. } else {
  1186. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1187. Message: "AWS ServiceKey exists",
  1188. Status: true,
  1189. }
  1190. }
  1191. return env.GetAWSAccessKeyID(), env.GetAWSAccessKeySecret()
  1192. }
  1193. // Load once and cache the result (even on failure). This is an install time secret, so
  1194. // we don't expect the secret to change. If it does, however, we can force reload using
  1195. // the input parameter.
  1196. func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
  1197. if !force && loadedAWSSecret {
  1198. return awsSecret, nil
  1199. }
  1200. loadedAWSSecret = true
  1201. exists, err := fileutil.FileExists(authSecretPath)
  1202. if !exists || err != nil {
  1203. return nil, fmt.Errorf("Failed to locate service account file: %s", authSecretPath)
  1204. }
  1205. result, err := ioutil.ReadFile(authSecretPath)
  1206. if err != nil {
  1207. return nil, err
  1208. }
  1209. var ak AWSAccessKey
  1210. err = json.Unmarshal(result, &ak)
  1211. if err != nil {
  1212. return nil, err
  1213. }
  1214. awsSecret = &ak
  1215. return awsSecret, nil
  1216. }
  1217. func getClusterConfig(ccFile string) (map[string]string, error) {
  1218. clusterConfig, err := os.Open(ccFile)
  1219. if err != nil {
  1220. return nil, err
  1221. }
  1222. defer clusterConfig.Close()
  1223. b, err := ioutil.ReadAll(clusterConfig)
  1224. if err != nil {
  1225. return nil, err
  1226. }
  1227. var clusterConf map[string]string
  1228. err = json.Unmarshal([]byte(b), &clusterConf)
  1229. if err != nil {
  1230. return nil, err
  1231. }
  1232. return clusterConf, nil
  1233. }
  1234. func (a *AWS) getAddressesForRegion(region string) (*ec2.DescribeAddressesOutput, error) {
  1235. sess, err := session.NewSession(&aws.Config{
  1236. Region: aws.String(region),
  1237. Credentials: credentials.NewEnvCredentials(),
  1238. })
  1239. if err != nil {
  1240. return nil, err
  1241. }
  1242. ec2Svc := ec2.New(sess)
  1243. return ec2Svc.DescribeAddresses(&ec2.DescribeAddressesInput{})
  1244. }
  1245. func (a *AWS) GetAddresses() ([]byte, error) {
  1246. a.ConfigureAuth() // load authentication data into env vars
  1247. addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
  1248. errorCh := make(chan error, len(awsRegions))
  1249. var wg sync.WaitGroup
  1250. wg.Add(len(awsRegions))
  1251. // Get volumes from each AWS region
  1252. for _, r := range awsRegions {
  1253. // Fetch IP address response and send results and errors to their
  1254. // respective channels
  1255. go func(region string) {
  1256. defer wg.Done()
  1257. defer errors.HandlePanic()
  1258. // Query for first page of volume results
  1259. resp, err := a.getAddressesForRegion(region)
  1260. if err != nil {
  1261. if aerr, ok := err.(awserr.Error); ok {
  1262. switch aerr.Code() {
  1263. default:
  1264. errorCh <- aerr
  1265. }
  1266. return
  1267. } else {
  1268. errorCh <- err
  1269. return
  1270. }
  1271. }
  1272. addressCh <- resp
  1273. }(r)
  1274. }
  1275. // Close the result channels after everything has been sent
  1276. go func() {
  1277. defer errors.HandlePanic()
  1278. wg.Wait()
  1279. close(errorCh)
  1280. close(addressCh)
  1281. }()
  1282. addresses := []*ec2.Address{}
  1283. for adds := range addressCh {
  1284. addresses = append(addresses, adds.Addresses...)
  1285. }
  1286. errors := []error{}
  1287. for err := range errorCh {
  1288. log.DedupedWarningf(5, "unable to get addresses: %s", err)
  1289. errors = append(errors, err)
  1290. }
  1291. // Return error if no addresses are returned
  1292. if len(errors) > 0 && len(addresses) == 0 {
  1293. return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errors), errors)
  1294. }
  1295. // Format the response this way to match the JSON-encoded formatting of a single response
  1296. // from DescribeAddresss, so that consumers can always expect AWS disk responses to have
  1297. // a "Addresss" key at the top level.
  1298. return json.Marshal(map[string][]*ec2.Address{
  1299. "Addresses": addresses,
  1300. })
  1301. }
  1302. func (a *AWS) getDisksForRegion(region string, maxResults int64, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
  1303. sess, err := session.NewSession(&aws.Config{
  1304. Region: aws.String(region),
  1305. Credentials: credentials.NewEnvCredentials(),
  1306. })
  1307. if err != nil {
  1308. return nil, err
  1309. }
  1310. ec2Svc := ec2.New(sess)
  1311. return ec2Svc.DescribeVolumes(&ec2.DescribeVolumesInput{
  1312. MaxResults: &maxResults,
  1313. NextToken: nextToken,
  1314. })
  1315. }
  1316. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  1317. func (a *AWS) GetDisks() ([]byte, error) {
  1318. a.ConfigureAuth() // load authentication data into env vars
  1319. volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
  1320. errorCh := make(chan error, len(awsRegions))
  1321. var wg sync.WaitGroup
  1322. wg.Add(len(awsRegions))
  1323. // Get volumes from each AWS region
  1324. for _, r := range awsRegions {
  1325. // Fetch volume response and send results and errors to their
  1326. // respective channels
  1327. go func(region string) {
  1328. defer wg.Done()
  1329. defer errors.HandlePanic()
  1330. // Query for first page of volume results
  1331. resp, err := a.getDisksForRegion(region, 1000, nil)
  1332. if err != nil {
  1333. if aerr, ok := err.(awserr.Error); ok {
  1334. switch aerr.Code() {
  1335. default:
  1336. errorCh <- aerr
  1337. }
  1338. return
  1339. } else {
  1340. errorCh <- err
  1341. return
  1342. }
  1343. }
  1344. volumeCh <- resp
  1345. // A NextToken indicates more pages of results. Keep querying
  1346. // until all pages are retrieved.
  1347. for resp.NextToken != nil {
  1348. resp, err = a.getDisksForRegion(region, 100, resp.NextToken)
  1349. if err != nil {
  1350. if aerr, ok := err.(awserr.Error); ok {
  1351. switch aerr.Code() {
  1352. default:
  1353. errorCh <- aerr
  1354. }
  1355. return
  1356. } else {
  1357. errorCh <- err
  1358. return
  1359. }
  1360. }
  1361. volumeCh <- resp
  1362. }
  1363. }(r)
  1364. }
  1365. // Close the result channels after everything has been sent
  1366. go func() {
  1367. defer errors.HandlePanic()
  1368. wg.Wait()
  1369. close(errorCh)
  1370. close(volumeCh)
  1371. }()
  1372. volumes := []*ec2.Volume{}
  1373. for vols := range volumeCh {
  1374. volumes = append(volumes, vols.Volumes...)
  1375. }
  1376. errors := []error{}
  1377. for err := range errorCh {
  1378. log.DedupedWarningf(5, "unable to get disks: %s", err)
  1379. errors = append(errors, err)
  1380. }
  1381. // Return error if no volumes are returned
  1382. if len(errors) > 0 && len(volumes) == 0 {
  1383. return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errors), errors)
  1384. }
  1385. // Format the response this way to match the JSON-encoded formatting of a single response
  1386. // from DescribeVolumes, so that consumers can always expect AWS disk responses to have
  1387. // a "Volumes" key at the top level.
  1388. return json.Marshal(map[string][]*ec2.Volume{
  1389. "Volumes": volumes,
  1390. })
  1391. }
  1392. func generateAWSGroupBy(lastIdx int) string {
  1393. sequence := []string{}
  1394. for i := 1; i < lastIdx+1; i++ {
  1395. sequence = append(sequence, strconv.Itoa(i))
  1396. }
  1397. return strings.Join(sequence, ",")
  1398. }
  1399. func (a *AWS) QueryAthenaPaginated(query string) (*athena.GetQueryResultsInput, *athena.Athena, error) {
  1400. customPricing, err := a.GetConfig()
  1401. if err != nil {
  1402. return nil, nil, err
  1403. }
  1404. a.ConfigureAuthWith(customPricing)
  1405. region := aws.String(customPricing.AthenaRegion)
  1406. resultsBucket := customPricing.AthenaBucketName
  1407. database := customPricing.AthenaDatabase
  1408. c := &aws.Config{
  1409. Region: region,
  1410. STSRegionalEndpoint: endpoints.RegionalSTSEndpoint,
  1411. }
  1412. s := session.Must(session.NewSession(c))
  1413. svc := athena.New(s)
  1414. if customPricing.MasterPayerARN != "" {
  1415. creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
  1416. svc = athena.New(s, &aws.Config{
  1417. Region: region,
  1418. Credentials: creds,
  1419. })
  1420. }
  1421. var e athena.StartQueryExecutionInput
  1422. var r athena.ResultConfiguration
  1423. r.SetOutputLocation(resultsBucket)
  1424. e.SetResultConfiguration(&r)
  1425. e.SetQueryString(query)
  1426. var q athena.QueryExecutionContext
  1427. q.SetDatabase(database)
  1428. e.SetQueryExecutionContext(&q)
  1429. res, err := svc.StartQueryExecution(&e)
  1430. if err != nil {
  1431. return nil, svc, err
  1432. }
  1433. klog.V(2).Infof("StartQueryExecution result:")
  1434. klog.V(2).Infof(res.GoString())
  1435. var qri athena.GetQueryExecutionInput
  1436. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1437. var qrop *athena.GetQueryExecutionOutput
  1438. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1439. for {
  1440. qrop, err = svc.GetQueryExecution(&qri)
  1441. if err != nil {
  1442. return nil, svc, err
  1443. }
  1444. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1445. break
  1446. }
  1447. time.Sleep(duration)
  1448. }
  1449. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1450. var ip athena.GetQueryResultsInput
  1451. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1452. return &ip, svc, nil
  1453. } else {
  1454. return nil, svc, fmt.Errorf("No results available for %s", query)
  1455. }
  1456. }
  1457. func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
  1458. customPricing, err := a.GetConfig()
  1459. if err != nil {
  1460. return nil, err
  1461. }
  1462. a.ConfigureAuthWith(customPricing) // load aws authentication from configuration or secret
  1463. region := aws.String(customPricing.AthenaRegion)
  1464. resultsBucket := customPricing.AthenaBucketName
  1465. database := customPricing.AthenaDatabase
  1466. c := &aws.Config{
  1467. Region: region,
  1468. STSRegionalEndpoint: endpoints.RegionalSTSEndpoint,
  1469. }
  1470. s := session.Must(session.NewSession(c))
  1471. svc := athena.New(s)
  1472. if customPricing.MasterPayerARN != "" {
  1473. creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
  1474. svc = athena.New(s, &aws.Config{
  1475. Region: region,
  1476. Credentials: creds,
  1477. })
  1478. }
  1479. var e athena.StartQueryExecutionInput
  1480. var r athena.ResultConfiguration
  1481. r.SetOutputLocation(resultsBucket)
  1482. e.SetResultConfiguration(&r)
  1483. e.SetQueryString(query)
  1484. var q athena.QueryExecutionContext
  1485. q.SetDatabase(database)
  1486. e.SetQueryExecutionContext(&q)
  1487. res, err := svc.StartQueryExecution(&e)
  1488. if err != nil {
  1489. return nil, err
  1490. }
  1491. klog.V(2).Infof("StartQueryExecution result:")
  1492. klog.V(2).Infof(res.GoString())
  1493. var qri athena.GetQueryExecutionInput
  1494. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1495. var qrop *athena.GetQueryExecutionOutput
  1496. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1497. for {
  1498. qrop, err = svc.GetQueryExecution(&qri)
  1499. if err != nil {
  1500. return nil, err
  1501. }
  1502. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1503. break
  1504. }
  1505. time.Sleep(duration)
  1506. }
  1507. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1508. var ip athena.GetQueryResultsInput
  1509. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1510. return svc.GetQueryResults(&ip)
  1511. } else {
  1512. return nil, fmt.Errorf("No results available for %s", query)
  1513. }
  1514. }
  1515. type SavingsPlanData struct {
  1516. ResourceID string
  1517. EffectiveCost float64
  1518. SavingsPlanARN string
  1519. MostRecentDate string
  1520. }
  1521. func (a *AWS) GetSavingsPlanDataFromAthena() error {
  1522. cfg, err := a.GetConfig()
  1523. if err != nil {
  1524. return err
  1525. }
  1526. if cfg.AthenaBucketName == "" {
  1527. return fmt.Errorf("No Athena Bucket configured")
  1528. }
  1529. if a.SavingsPlanDataByInstanceID == nil {
  1530. a.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData)
  1531. }
  1532. tNow := time.Now()
  1533. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1534. start := tOneDayAgo.Format("2006-01-02")
  1535. end := tNow.Format("2006-01-02")
  1536. // Use Savings Plan Effective Rate as an estimation for cost, assuming the 1h most recent period got a fully loaded savings plan.
  1537. //
  1538. q := `SELECT
  1539. line_item_usage_start_date,
  1540. savings_plan_savings_plan_a_r_n,
  1541. line_item_resource_id,
  1542. savings_plan_savings_plan_rate
  1543. FROM %s as cost_data
  1544. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1545. AND line_item_line_item_type = 'SavingsPlanCoveredUsage' ORDER BY
  1546. line_item_usage_start_date DESC`
  1547. page := 0
  1548. processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
  1549. a.SavingsPlanDataLock.Lock()
  1550. a.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData) // Clean out the old data and only report a savingsplan price if its in the most recent run.
  1551. mostRecentDate := ""
  1552. iter := op.ResultSet.Rows
  1553. if page == 0 && len(iter) > 0 {
  1554. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1555. }
  1556. page++
  1557. for _, r := range iter {
  1558. d := *r.Data[0].VarCharValue
  1559. if mostRecentDate == "" {
  1560. mostRecentDate = d
  1561. } else if mostRecentDate != d { // Get all most recent assignments
  1562. break
  1563. }
  1564. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1565. if err != nil {
  1566. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1567. }
  1568. r := &SavingsPlanData{
  1569. ResourceID: *r.Data[2].VarCharValue,
  1570. EffectiveCost: cost,
  1571. SavingsPlanARN: *r.Data[1].VarCharValue,
  1572. MostRecentDate: d,
  1573. }
  1574. a.SavingsPlanDataByInstanceID[r.ResourceID] = r
  1575. }
  1576. klog.V(1).Infof("Found %d savings plan applied instances", len(a.SavingsPlanDataByInstanceID))
  1577. for k, r := range a.SavingsPlanDataByInstanceID {
  1578. log.DedupedInfof(5, "Savings Plan Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1579. }
  1580. a.SavingsPlanDataLock.Unlock()
  1581. return true
  1582. }
  1583. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1584. klog.V(3).Infof("Running Query: %s", query)
  1585. ip, svc, err := a.QueryAthenaPaginated(query)
  1586. if err != nil {
  1587. return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
  1588. }
  1589. athenaErr := svc.GetQueryResultsPages(ip, processResults)
  1590. if athenaErr != nil {
  1591. return athenaErr
  1592. }
  1593. return nil
  1594. }
  1595. type RIData struct {
  1596. ResourceID string
  1597. EffectiveCost float64
  1598. ReservationARN string
  1599. MostRecentDate string
  1600. }
  1601. func (a *AWS) GetReservationDataFromAthena() error {
  1602. cfg, err := a.GetConfig()
  1603. if err != nil {
  1604. return err
  1605. }
  1606. if cfg.AthenaBucketName == "" {
  1607. return fmt.Errorf("No Athena Bucket configured")
  1608. }
  1609. // Query for all column names in advance in order to validate configured
  1610. // label columns
  1611. columns, _ := a.ShowAthenaColumns()
  1612. if columns["reservation_reservation_a_r_n"] && columns["reservation_effective_cost"] {
  1613. if a.RIPricingByInstanceID == nil {
  1614. a.RIPricingByInstanceID = make(map[string]*RIData)
  1615. }
  1616. tNow := time.Now()
  1617. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1618. start := tOneDayAgo.Format("2006-01-02")
  1619. end := tNow.Format("2006-01-02")
  1620. q := `SELECT
  1621. line_item_usage_start_date,
  1622. reservation_reservation_a_r_n,
  1623. line_item_resource_id,
  1624. reservation_effective_cost
  1625. FROM %s as cost_data
  1626. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1627. AND reservation_reservation_a_r_n <> '' ORDER BY
  1628. line_item_usage_start_date DESC`
  1629. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1630. op, err := a.QueryAthenaBillingData(query)
  1631. if err != nil {
  1632. a.RIPricingError = err
  1633. return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
  1634. }
  1635. a.RIPricingError = nil
  1636. klog.Infof("Fetching RI data...")
  1637. if len(op.ResultSet.Rows) > 1 {
  1638. a.RIDataLock.Lock()
  1639. mostRecentDate := ""
  1640. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1641. d := *r.Data[0].VarCharValue
  1642. if mostRecentDate == "" {
  1643. mostRecentDate = d
  1644. } else if mostRecentDate != d { // Get all most recent assignments
  1645. break
  1646. }
  1647. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1648. if err != nil {
  1649. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1650. }
  1651. r := &RIData{
  1652. ResourceID: *r.Data[2].VarCharValue,
  1653. EffectiveCost: cost,
  1654. ReservationARN: *r.Data[1].VarCharValue,
  1655. MostRecentDate: d,
  1656. }
  1657. a.RIPricingByInstanceID[r.ResourceID] = r
  1658. }
  1659. klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
  1660. for k, r := range a.RIPricingByInstanceID {
  1661. log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1662. }
  1663. a.RIDataLock.Unlock()
  1664. } else {
  1665. klog.Infof("No reserved instance data found")
  1666. }
  1667. } else {
  1668. klog.Infof("No reserved data available in Athena")
  1669. a.RIPricingError = nil
  1670. }
  1671. return nil
  1672. }
  1673. // ShowAthenaColumns returns a list of the names of all columns in the configured
  1674. // Athena tables
  1675. func (aws *AWS) ShowAthenaColumns() (map[string]bool, error) {
  1676. columnSet := map[string]bool{}
  1677. // Configure Athena query
  1678. cfg, err := aws.GetConfig()
  1679. if err != nil {
  1680. return nil, err
  1681. }
  1682. if cfg.AthenaTable == "" {
  1683. return nil, fmt.Errorf("AthenaTable not configured")
  1684. }
  1685. if cfg.AthenaBucketName == "" {
  1686. return nil, fmt.Errorf("AthenaBucketName not configured")
  1687. }
  1688. q := `SHOW COLUMNS IN %s`
  1689. query := fmt.Sprintf(q, cfg.AthenaTable)
  1690. results, svc, err := aws.QueryAthenaPaginated(query)
  1691. columns := []string{}
  1692. pageNum := 0
  1693. athenaErr := svc.GetQueryResultsPages(results, func(page *athena.GetQueryResultsOutput, lastpage bool) bool {
  1694. for _, row := range page.ResultSet.Rows {
  1695. columns = append(columns, *row.Data[0].VarCharValue)
  1696. }
  1697. pageNum++
  1698. return true
  1699. })
  1700. if athenaErr != nil {
  1701. log.Warningf("Error getting Athena columns: %s", err)
  1702. return columnSet, athenaErr
  1703. }
  1704. for _, col := range columns {
  1705. columnSet[col] = true
  1706. }
  1707. return columnSet, nil
  1708. }
  1709. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  1710. // "start" and "end" are dates of the format YYYY-MM-DD
  1711. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  1712. func (a *AWS) ExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string, crossCluster bool) ([]*OutOfClusterAllocation, error) {
  1713. customPricing, err := a.GetConfig()
  1714. if err != nil {
  1715. return nil, err
  1716. }
  1717. formattedAggregators := []string{}
  1718. for _, agg := range aggregators {
  1719. aggregator_column_name := "resource_tags_user_" + agg
  1720. aggregator_column_name = cloudutil.ConvertToGlueColumnFormat(aggregator_column_name)
  1721. formattedAggregators = append(formattedAggregators, aggregator_column_name)
  1722. }
  1723. aggregatorNames := strings.Join(formattedAggregators, ",")
  1724. aggregatorOr := strings.Join(formattedAggregators, " <> '' OR ")
  1725. aggregatorOr = aggregatorOr + " <> ''"
  1726. filter_column_name := "resource_tags_user_" + filterType
  1727. filter_column_name = cloudutil.ConvertToGlueColumnFormat(filter_column_name)
  1728. var query string
  1729. var lastIdx int
  1730. if filterType != "kubernetes_" { // This gets appended upstream and is equivalent to no filter.
  1731. lastIdx = len(formattedAggregators) + 3
  1732. groupby := generateAWSGroupBy(lastIdx)
  1733. query = fmt.Sprintf(`SELECT
  1734. CAST(line_item_usage_start_date AS DATE) as start_date,
  1735. %s,
  1736. line_item_product_code,
  1737. %s,
  1738. SUM(line_item_blended_cost) as blended_cost
  1739. FROM %s as cost_data
  1740. WHERE (%s='%s') AND line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
  1741. GROUP BY %s`, aggregatorNames, filter_column_name, customPricing.AthenaTable, filter_column_name, filterValue, start, end, aggregatorOr, groupby)
  1742. } else {
  1743. lastIdx = len(formattedAggregators) + 2
  1744. groupby := generateAWSGroupBy(lastIdx)
  1745. query = fmt.Sprintf(`SELECT
  1746. CAST(line_item_usage_start_date AS DATE) as start_date,
  1747. %s,
  1748. line_item_product_code,
  1749. SUM(line_item_blended_cost) as blended_cost
  1750. FROM %s as cost_data
  1751. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
  1752. GROUP BY %s`, aggregatorNames, customPricing.AthenaTable, start, end, aggregatorOr, groupby)
  1753. }
  1754. var oocAllocs []*OutOfClusterAllocation
  1755. page := 0
  1756. processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
  1757. iter := op.ResultSet.Rows
  1758. if page == 0 && len(iter) > 0 {
  1759. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1760. }
  1761. page++
  1762. for _, r := range iter {
  1763. cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
  1764. if err != nil {
  1765. klog.Infof("Error converting cost `%s` from float ", *r.Data[lastIdx].VarCharValue)
  1766. }
  1767. environment := ""
  1768. for _, d := range r.Data[1 : len(formattedAggregators)+1] {
  1769. if *d.VarCharValue != "" {
  1770. environment = *d.VarCharValue // just set to the first nonempty match
  1771. }
  1772. break
  1773. }
  1774. ooc := &OutOfClusterAllocation{
  1775. Aggregator: strings.Join(aggregators, ","),
  1776. Environment: environment,
  1777. Service: *r.Data[len(formattedAggregators)+1].VarCharValue,
  1778. Cost: cost,
  1779. }
  1780. oocAllocs = append(oocAllocs, ooc)
  1781. }
  1782. return true
  1783. }
  1784. // Query for all column names in advance in order to validate configured
  1785. // label columns
  1786. columns, _ := a.ShowAthenaColumns()
  1787. // Check for all aggregators being formatted into the query
  1788. containsColumns := true
  1789. for _, agg := range formattedAggregators {
  1790. if columns[agg] != true {
  1791. containsColumns = false
  1792. klog.Warningf("Athena missing column: %s", agg)
  1793. }
  1794. }
  1795. if containsColumns {
  1796. klog.V(3).Infof("Running Query: %s", query)
  1797. ip, svc, _ := a.QueryAthenaPaginated(query)
  1798. athenaErr := svc.GetQueryResultsPages(ip, processResults)
  1799. if athenaErr != nil {
  1800. klog.Infof("RETURNING ATHENA ERROR")
  1801. return nil, athenaErr
  1802. }
  1803. if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
  1804. gcp, err := NewCrossClusterProvider("gcp", "aws.json", a.Clientset)
  1805. if err != nil {
  1806. klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
  1807. }
  1808. gcpOOC, err := gcp.ExternalAllocations(start, end, aggregators, filterType, filterValue, true)
  1809. if err != nil {
  1810. klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
  1811. }
  1812. oocAllocs = append(oocAllocs, gcpOOC...)
  1813. }
  1814. } else {
  1815. klog.Infof("External Allocations: Athena Query skipped due to missing columns")
  1816. }
  1817. return oocAllocs, nil
  1818. }
  1819. // QuerySQL can query a properly configured Athena database.
  1820. // Used to fetch billing data.
  1821. // Requires a json config in /var/configs with key region, output, and database.
  1822. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  1823. customPricing, err := a.GetConfig()
  1824. if err != nil {
  1825. return nil, err
  1826. }
  1827. a.ConfigureAuthWith(customPricing) // load aws authentication from configuration or secret
  1828. athenaConfigs, err := os.Open("/var/configs/athena.json")
  1829. if err != nil {
  1830. return nil, err
  1831. }
  1832. defer athenaConfigs.Close()
  1833. b, err := ioutil.ReadAll(athenaConfigs)
  1834. if err != nil {
  1835. return nil, err
  1836. }
  1837. var athenaConf map[string]string
  1838. json.Unmarshal([]byte(b), &athenaConf)
  1839. region := aws.String(customPricing.AthenaRegion)
  1840. resultsBucket := customPricing.AthenaBucketName
  1841. database := customPricing.AthenaDatabase
  1842. c := &aws.Config{
  1843. Region: region,
  1844. }
  1845. s := session.Must(session.NewSession(c))
  1846. svc := athena.New(s)
  1847. var e athena.StartQueryExecutionInput
  1848. var r athena.ResultConfiguration
  1849. r.SetOutputLocation(resultsBucket)
  1850. e.SetResultConfiguration(&r)
  1851. e.SetQueryString(query)
  1852. var q athena.QueryExecutionContext
  1853. q.SetDatabase(database)
  1854. e.SetQueryExecutionContext(&q)
  1855. res, err := svc.StartQueryExecution(&e)
  1856. if err != nil {
  1857. return nil, err
  1858. }
  1859. klog.V(2).Infof("StartQueryExecution result:")
  1860. klog.V(2).Infof(res.GoString())
  1861. var qri athena.GetQueryExecutionInput
  1862. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1863. var qrop *athena.GetQueryExecutionOutput
  1864. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1865. for {
  1866. qrop, err = svc.GetQueryExecution(&qri)
  1867. if err != nil {
  1868. return nil, err
  1869. }
  1870. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1871. break
  1872. }
  1873. time.Sleep(duration)
  1874. }
  1875. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1876. var ip athena.GetQueryResultsInput
  1877. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1878. op, err := svc.GetQueryResults(&ip)
  1879. if err != nil {
  1880. return nil, err
  1881. }
  1882. b, err := json.Marshal(op.ResultSet)
  1883. if err != nil {
  1884. return nil, err
  1885. }
  1886. return b, nil
  1887. }
  1888. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  1889. }
  1890. type spotInfo struct {
  1891. Timestamp string `csv:"Timestamp"`
  1892. UsageType string `csv:"UsageType"`
  1893. Operation string `csv:"Operation"`
  1894. InstanceID string `csv:"InstanceID"`
  1895. MyBidID string `csv:"MyBidID"`
  1896. MyMaxPrice string `csv:"MyMaxPrice"`
  1897. MarketPrice string `csv:"MarketPrice"`
  1898. Charge string `csv:"Charge"`
  1899. Version string `csv:"Version"`
  1900. }
  1901. type fnames []*string
  1902. func (f fnames) Len() int {
  1903. return len(f)
  1904. }
  1905. func (f fnames) Swap(i, j int) {
  1906. f[i], f[j] = f[j], f[i]
  1907. }
  1908. func (f fnames) Less(i, j int) bool {
  1909. key1 := strings.Split(*f[i], ".")
  1910. key2 := strings.Split(*f[j], ".")
  1911. t1, err := time.Parse("2006-01-02-15", key1[1])
  1912. if err != nil {
  1913. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  1914. return false
  1915. }
  1916. t2, err := time.Parse("2006-01-02-15", key2[1])
  1917. if err != nil {
  1918. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  1919. return false
  1920. }
  1921. return t1.Before(t2)
  1922. }
  1923. func (a *AWS) parseSpotData(bucket string, prefix string, projectID string, region string) (map[string]*spotInfo, error) {
  1924. if a.ServiceAccountChecks == nil { // Set up checks to store error/success states
  1925. a.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  1926. }
  1927. a.ConfigureAuth() // configure aws api authentication by setting env vars
  1928. s3Prefix := projectID
  1929. if len(prefix) != 0 {
  1930. s3Prefix = prefix + "/" + s3Prefix
  1931. }
  1932. c := aws.NewConfig().WithRegion(region)
  1933. s := session.Must(session.NewSession(c))
  1934. s3Svc := s3.New(s)
  1935. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  1936. tNow := time.Now()
  1937. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1938. ls := &s3.ListObjectsInput{
  1939. Bucket: aws.String(bucket),
  1940. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1941. }
  1942. ls2 := &s3.ListObjectsInput{
  1943. Bucket: aws.String(bucket),
  1944. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1945. }
  1946. lso, err := s3Svc.ListObjects(ls)
  1947. if err != nil {
  1948. a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
  1949. Message: "Bucket List Permissions Available",
  1950. Status: false,
  1951. AdditionalInfo: err.Error(),
  1952. }
  1953. return nil, err
  1954. } else {
  1955. a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
  1956. Message: "Bucket List Permissions Available",
  1957. Status: true,
  1958. }
  1959. }
  1960. lsoLen := len(lso.Contents)
  1961. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1962. if lsoLen == 0 {
  1963. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1964. }
  1965. lso2, err := s3Svc.ListObjects(ls2)
  1966. if err != nil {
  1967. return nil, err
  1968. }
  1969. lso2Len := len(lso2.Contents)
  1970. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1971. if lso2Len == 0 {
  1972. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1973. }
  1974. // TODO: Worth it to use LastModifiedDate to determine if we should reparse the spot data?
  1975. var keys []*string
  1976. for _, obj := range lso.Contents {
  1977. keys = append(keys, obj.Key)
  1978. }
  1979. for _, obj := range lso2.Contents {
  1980. keys = append(keys, obj.Key)
  1981. }
  1982. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1983. header, err := csvutil.Header(spotInfo{}, "csv")
  1984. if err != nil {
  1985. return nil, err
  1986. }
  1987. fieldsPerRecord := len(header)
  1988. spots := make(map[string]*spotInfo)
  1989. for _, key := range keys {
  1990. getObj := &s3.GetObjectInput{
  1991. Bucket: aws.String(bucket),
  1992. Key: key,
  1993. }
  1994. buf := aws.NewWriteAtBuffer([]byte{})
  1995. _, err := downloader.Download(buf, getObj)
  1996. if err != nil {
  1997. a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
  1998. Message: "Object Get Permissions Available",
  1999. Status: false,
  2000. AdditionalInfo: err.Error(),
  2001. }
  2002. return nil, err
  2003. } else {
  2004. a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
  2005. Message: "Object Get Permissions Available",
  2006. Status: true,
  2007. }
  2008. }
  2009. r := bytes.NewReader(buf.Bytes())
  2010. gr, err := gzip.NewReader(r)
  2011. if err != nil {
  2012. return nil, err
  2013. }
  2014. csvReader := csv.NewReader(gr)
  2015. csvReader.Comma = '\t'
  2016. csvReader.FieldsPerRecord = fieldsPerRecord
  2017. dec, err := csvutil.NewDecoder(csvReader, header...)
  2018. if err != nil {
  2019. return nil, err
  2020. }
  2021. var foundVersion string
  2022. for {
  2023. spot := spotInfo{}
  2024. err := dec.Decode(&spot)
  2025. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  2026. if err == io.EOF {
  2027. break
  2028. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  2029. rec := dec.Record()
  2030. // the first two "Record()" will be the comment lines
  2031. // and they show up as len() == 1
  2032. // the first of which is "#Version"
  2033. // the second of which is "#Fields: "
  2034. if len(rec) != 1 {
  2035. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  2036. continue
  2037. }
  2038. if len(foundVersion) == 0 {
  2039. spotFeedVersion := rec[0]
  2040. klog.V(4).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  2041. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  2042. if matches != nil {
  2043. foundVersion = matches[1]
  2044. if foundVersion != supportedSpotFeedVersion {
  2045. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  2046. break
  2047. }
  2048. }
  2049. continue
  2050. } else if strings.Index(rec[0], "#") == 0 {
  2051. continue
  2052. } else {
  2053. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  2054. continue
  2055. }
  2056. } else if err != nil {
  2057. klog.V(2).Infof("Error during spot info decode: %+v", err)
  2058. continue
  2059. }
  2060. log.DedupedInfof(5, "Found spot info for: %s", spot.InstanceID)
  2061. spots[spot.InstanceID] = &spot
  2062. }
  2063. gr.Close()
  2064. }
  2065. return spots, nil
  2066. }
  2067. func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  2068. }
  2069. func (a *AWS) ServiceAccountStatus() *ServiceAccountStatus {
  2070. checks := []*ServiceAccountCheck{}
  2071. for _, v := range a.ServiceAccountChecks {
  2072. checks = append(checks, v)
  2073. }
  2074. return &ServiceAccountStatus{
  2075. Checks: checks,
  2076. }
  2077. }
  2078. func (aws *AWS) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  2079. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  2080. }
  2081. func (aws *AWS) Regions() []string {
  2082. return awsRegions
  2083. }