awsprovider.go 68 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "encoding/csv"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "os"
  12. "regexp"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "github.com/aws/aws-sdk-go/aws/endpoints"
  18. "k8s.io/klog"
  19. "github.com/kubecost/cost-model/pkg/clustercache"
  20. "github.com/kubecost/cost-model/pkg/env"
  21. "github.com/kubecost/cost-model/pkg/errors"
  22. "github.com/kubecost/cost-model/pkg/log"
  23. "github.com/kubecost/cost-model/pkg/util"
  24. "github.com/kubecost/cost-model/pkg/util/cloudutil"
  25. "github.com/kubecost/cost-model/pkg/util/fileutil"
  26. "github.com/kubecost/cost-model/pkg/util/json"
  27. "github.com/aws/aws-sdk-go/aws"
  28. "github.com/aws/aws-sdk-go/aws/awserr"
  29. "github.com/aws/aws-sdk-go/aws/credentials"
  30. "github.com/aws/aws-sdk-go/aws/credentials/stscreds"
  31. "github.com/aws/aws-sdk-go/aws/session"
  32. "github.com/aws/aws-sdk-go/service/athena"
  33. "github.com/aws/aws-sdk-go/service/ec2"
  34. "github.com/aws/aws-sdk-go/service/s3"
  35. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  36. awsV2 "github.com/aws/aws-sdk-go-v2/aws"
  37. "github.com/jszwec/csvutil"
  38. v1 "k8s.io/api/core/v1"
  39. )
  40. const supportedSpotFeedVersion = "1"
  41. const SpotInfoUpdateType = "spotinfo"
  42. const AthenaInfoUpdateType = "athenainfo"
  43. const PreemptibleType = "preemptible"
  44. const APIPricingSource = "Public API"
  45. const SpotPricingSource = "Spot Data Feed"
  46. const ReservedInstancePricingSource = "Savings Plan, Reserved Instance, and Out-Of-Cluster"
  47. func (aws *AWS) PricingSourceStatus() map[string]*PricingSource {
  48. sources := make(map[string]*PricingSource)
  49. sps := &PricingSource{
  50. Name: SpotPricingSource,
  51. }
  52. sps.Error = ""
  53. if aws.SpotPricingError != nil {
  54. sps.Error = aws.SpotPricingError.Error()
  55. }
  56. if sps.Error != "" {
  57. sps.Available = false
  58. } else if len(aws.SpotPricingByInstanceID) > 0 {
  59. sps.Available = true
  60. } else {
  61. sps.Error = "No spot instances detected"
  62. }
  63. sources[SpotPricingSource] = sps
  64. rps := &PricingSource{
  65. Name: ReservedInstancePricingSource,
  66. }
  67. rps.Error = ""
  68. if aws.RIPricingError != nil {
  69. rps.Error = aws.RIPricingError.Error()
  70. }
  71. if rps.Error != "" {
  72. rps.Available = false
  73. } else {
  74. rps.Available = true
  75. }
  76. sources[ReservedInstancePricingSource] = rps
  77. return sources
  78. }
  79. // How often spot data is refreshed
  80. const SpotRefreshDuration = 15 * time.Minute
  81. const defaultConfigPath = "/var/configs/"
  82. var awsRegions = []string{
  83. "us-east-2",
  84. "us-east-1",
  85. "us-west-1",
  86. "us-west-2",
  87. "ap-east-1",
  88. "ap-south-1",
  89. "ap-northeast-3",
  90. "ap-northeast-2",
  91. "ap-southeast-1",
  92. "ap-southeast-2",
  93. "ap-northeast-1",
  94. "ca-central-1",
  95. "cn-north-1",
  96. "cn-northwest-1",
  97. "eu-central-1",
  98. "eu-west-1",
  99. "eu-west-2",
  100. "eu-west-3",
  101. "eu-north-1",
  102. "me-south-1",
  103. "sa-east-1",
  104. "us-gov-east-1",
  105. "us-gov-west-1",
  106. }
  107. // AWS represents an Amazon Provider
  108. type AWS struct {
  109. Pricing map[string]*AWSProductTerms
  110. SpotPricingByInstanceID map[string]*spotInfo
  111. SpotPricingUpdatedAt *time.Time
  112. SpotRefreshRunning bool
  113. SpotPricingLock sync.RWMutex
  114. SpotPricingError error
  115. RIPricingByInstanceID map[string]*RIData
  116. RIPricingError error
  117. RIDataRunning bool
  118. RIDataLock sync.RWMutex
  119. SavingsPlanDataByInstanceID map[string]*SavingsPlanData
  120. SavingsPlanDataRunning bool
  121. SavingsPlanDataLock sync.RWMutex
  122. ValidPricingKeys map[string]bool
  123. Clientset clustercache.ClusterCache
  124. BaseCPUPrice string
  125. BaseRAMPrice string
  126. BaseGPUPrice string
  127. BaseSpotCPUPrice string
  128. BaseSpotRAMPrice string
  129. BaseSpotGPUPrice string
  130. SpotLabelName string
  131. SpotLabelValue string
  132. SpotDataRegion string
  133. SpotDataBucket string
  134. SpotDataPrefix string
  135. ProjectID string
  136. DownloadPricingDataLock sync.RWMutex
  137. Config *ProviderConfig
  138. ServiceAccountChecks map[string]*ServiceAccountCheck
  139. clusterManagementPrice float64
  140. clusterAccountId string
  141. clusterRegion string
  142. clusterProvisioner string
  143. *CustomProvider
  144. }
  145. type AWSAccessKey struct {
  146. AccessKeyID string `json:"aws_access_key_id"`
  147. SecretAccessKey string `json:"aws_secret_access_key"`
  148. }
  149. // Retrieve returns a set of awsV2 credentials using the AWSAccessKey's key and secret.
  150. // This fullfils the awsV2.CredentialsProvider interface contract.
  151. func (accessKey AWSAccessKey) Retrieve(ctx context.Context) (awsV2.Credentials, error) {
  152. return awsV2.Credentials{
  153. AccessKeyID: accessKey.AccessKeyID,
  154. SecretAccessKey: accessKey.SecretAccessKey,
  155. }, nil
  156. }
  157. // AWSPricing maps a k8s node to an AWS Pricing "product"
  158. type AWSPricing struct {
  159. Products map[string]*AWSProduct `json:"products"`
  160. Terms AWSPricingTerms `json:"terms"`
  161. }
  162. // AWSProduct represents a purchased SKU
  163. type AWSProduct struct {
  164. Sku string `json:"sku"`
  165. Attributes AWSProductAttributes `json:"attributes"`
  166. }
  167. // AWSProductAttributes represents metadata about the product used to map to a node.
  168. type AWSProductAttributes struct {
  169. Location string `json:"location"`
  170. InstanceType string `json:"instanceType"`
  171. Memory string `json:"memory"`
  172. Storage string `json:"storage"`
  173. VCpu string `json:"vcpu"`
  174. UsageType string `json:"usagetype"`
  175. OperatingSystem string `json:"operatingSystem"`
  176. PreInstalledSw string `json:"preInstalledSw"`
  177. InstanceFamily string `json:"instanceFamily"`
  178. CapacityStatus string `json:"capacitystatus"`
  179. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  180. }
  181. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  182. type AWSPricingTerms struct {
  183. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  184. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  185. }
  186. // AWSOfferTerm is a sku extension used to pay for the node.
  187. type AWSOfferTerm struct {
  188. Sku string `json:"sku"`
  189. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  190. }
  191. func (ot *AWSOfferTerm) String() string {
  192. var strs []string
  193. for k, rc := range ot.PriceDimensions {
  194. strs = append(strs, fmt.Sprintf("%s:%s", k, rc.String()))
  195. }
  196. return fmt.Sprintf("%s:%s", ot.Sku, strings.Join(strs, ","))
  197. }
  198. // AWSRateCode encodes data about the price of a product
  199. type AWSRateCode struct {
  200. Unit string `json:"unit"`
  201. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  202. }
  203. func (rc *AWSRateCode) String() string {
  204. return fmt.Sprintf("{unit: %s, pricePerUnit: %v", rc.Unit, rc.PricePerUnit)
  205. }
  206. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  207. type AWSCurrencyCode struct {
  208. USD string `json:"USD,omitempty"`
  209. CNY string `json:"CNY,omitempty"`
  210. }
  211. // AWSProductTerms represents the full terms of the product
  212. type AWSProductTerms struct {
  213. Sku string `json:"sku"`
  214. OnDemand *AWSOfferTerm `json:"OnDemand"`
  215. Reserved *AWSOfferTerm `json:"Reserved"`
  216. Memory string `json:"memory"`
  217. Storage string `json:"storage"`
  218. VCpu string `json:"vcpu"`
  219. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  220. PV *PV `json:"pv"`
  221. }
  222. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  223. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  224. // OnDemandRateCode is appended to an node sku
  225. const OnDemandRateCode = ".JRTCKXETXF"
  226. const OnDemandRateCodeCn = ".99YE2YK9UR"
  227. // ReservedRateCode is appended to a node sku
  228. const ReservedRateCode = ".38NPMPTW36"
  229. // HourlyRateCode is appended to a node sku
  230. const HourlyRateCode = ".6YS6EN2CT7"
  231. const HourlyRateCodeCn = ".Q7UJUT2CE6"
  232. // volTypes are used to map between AWS UsageTypes and
  233. // EBS volume types, as they would appear in K8s storage class
  234. // name and the EC2 API.
  235. var volTypes = map[string]string{
  236. "EBS:VolumeUsage.gp2": "gp2",
  237. "EBS:VolumeUsage": "standard",
  238. "EBS:VolumeUsage.sc1": "sc1",
  239. "EBS:VolumeP-IOPS.piops": "io1",
  240. "EBS:VolumeUsage.st1": "st1",
  241. "EBS:VolumeUsage.piops": "io1",
  242. "gp2": "EBS:VolumeUsage.gp2",
  243. "standard": "EBS:VolumeUsage",
  244. "sc1": "EBS:VolumeUsage.sc1",
  245. "io1": "EBS:VolumeUsage.piops",
  246. "st1": "EBS:VolumeUsage.st1",
  247. }
  248. // locationToRegion maps AWS region names (As they come from Billing)
  249. // to actual region identifiers
  250. var locationToRegion = map[string]string{
  251. "US East (Ohio)": "us-east-2",
  252. "US East (N. Virginia)": "us-east-1",
  253. "US West (N. California)": "us-west-1",
  254. "US West (Oregon)": "us-west-2",
  255. "Asia Pacific (Hong Kong)": "ap-east-1",
  256. "Asia Pacific (Mumbai)": "ap-south-1",
  257. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  258. "Asia Pacific (Seoul)": "ap-northeast-2",
  259. "Asia Pacific (Singapore)": "ap-southeast-1",
  260. "Asia Pacific (Sydney)": "ap-southeast-2",
  261. "Asia Pacific (Tokyo)": "ap-northeast-1",
  262. "Canada (Central)": "ca-central-1",
  263. "China (Beijing)": "cn-north-1",
  264. "China (Ningxia)": "cn-northwest-1",
  265. "EU (Frankfurt)": "eu-central-1",
  266. "EU (Ireland)": "eu-west-1",
  267. "EU (London)": "eu-west-2",
  268. "EU (Paris)": "eu-west-3",
  269. "EU (Stockholm)": "eu-north-1",
  270. "South America (Sao Paulo)": "sa-east-1",
  271. "AWS GovCloud (US-East)": "us-gov-east-1",
  272. "AWS GovCloud (US-West)": "us-gov-west-1",
  273. }
  274. var regionToBillingRegionCode = map[string]string{
  275. "us-east-2": "USE2",
  276. "us-east-1": "",
  277. "us-west-1": "USW1",
  278. "us-west-2": "USW2",
  279. "ap-east-1": "APE1",
  280. "ap-south-1": "APS3",
  281. "ap-northeast-3": "APN3",
  282. "ap-northeast-2": "APN2",
  283. "ap-southeast-1": "APS1",
  284. "ap-southeast-2": "APS2",
  285. "ap-northeast-1": "APN1",
  286. "ca-central-1": "CAN1",
  287. "cn-north-1": "",
  288. "cn-northwest-1": "",
  289. "eu-central-1": "EUC1",
  290. "eu-west-1": "EU",
  291. "eu-west-2": "EUW2",
  292. "eu-west-3": "EUW3",
  293. "eu-north-1": "EUN1",
  294. "sa-east-1": "SAE1",
  295. "us-gov-east-1": "UGE1",
  296. "us-gov-west-1": "UGW1",
  297. }
  298. var loadedAWSSecret bool = false
  299. var awsSecret *AWSAccessKey = nil
  300. func (aws *AWS) GetLocalStorageQuery(window, offset time.Duration, rate bool, used bool) string {
  301. return ""
  302. }
  303. // KubeAttrConversion maps the k8s labels for region to an aws region
  304. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  305. operatingSystem = strings.ToLower(operatingSystem)
  306. region := locationToRegion[location]
  307. return region + "," + instanceType + "," + operatingSystem
  308. }
  309. type AwsSpotFeedInfo struct {
  310. BucketName string `json:"bucketName"`
  311. Prefix string `json:"prefix"`
  312. Region string `json:"region"`
  313. AccountID string `json:"projectID"`
  314. ServiceKeyName string `json:"serviceKeyName"`
  315. ServiceKeySecret string `json:"serviceKeySecret"`
  316. SpotLabel string `json:"spotLabel"`
  317. SpotLabelValue string `json:"spotLabelValue"`
  318. }
  319. type AwsAthenaInfo struct {
  320. AthenaBucketName string `json:"athenaBucketName"`
  321. AthenaRegion string `json:"athenaRegion"`
  322. AthenaDatabase string `json:"athenaDatabase"`
  323. AthenaTable string `json:"athenaTable"`
  324. ServiceKeyName string `json:"serviceKeyName"`
  325. ServiceKeySecret string `json:"serviceKeySecret"`
  326. AccountID string `json:"projectID"`
  327. MasterPayerARN string `json:"masterPayerARN"`
  328. }
  329. func (aws *AWS) GetManagementPlatform() (string, error) {
  330. nodes := aws.Clientset.GetAllNodes()
  331. if len(nodes) > 0 {
  332. n := nodes[0]
  333. version := n.Status.NodeInfo.KubeletVersion
  334. if strings.Contains(version, "eks") {
  335. return "eks", nil
  336. }
  337. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  338. return "kops", nil
  339. }
  340. }
  341. return "", nil
  342. }
  343. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  344. c, err := aws.Config.GetCustomPricingData()
  345. if err != nil {
  346. return nil, err
  347. }
  348. if c.Discount == "" {
  349. c.Discount = "0%"
  350. }
  351. if c.NegotiatedDiscount == "" {
  352. c.NegotiatedDiscount = "0%"
  353. }
  354. if c.ShareTenancyCosts == "" {
  355. c.ShareTenancyCosts = defaultShareTenancyCost
  356. }
  357. return c, nil
  358. }
  359. func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
  360. return aws.Config.UpdateFromMap(a)
  361. }
  362. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  363. return aws.Config.Update(func(c *CustomPricing) error {
  364. if updateType == SpotInfoUpdateType {
  365. a := AwsSpotFeedInfo{}
  366. err := json.NewDecoder(r).Decode(&a)
  367. if err != nil {
  368. return err
  369. }
  370. c.ServiceKeyName = a.ServiceKeyName
  371. if a.ServiceKeySecret != "" {
  372. c.ServiceKeySecret = a.ServiceKeySecret
  373. }
  374. c.SpotDataPrefix = a.Prefix
  375. c.SpotDataBucket = a.BucketName
  376. c.ProjectID = a.AccountID
  377. c.SpotDataRegion = a.Region
  378. c.SpotLabel = a.SpotLabel
  379. c.SpotLabelValue = a.SpotLabelValue
  380. } else if updateType == AthenaInfoUpdateType {
  381. a := AwsAthenaInfo{}
  382. err := json.NewDecoder(r).Decode(&a)
  383. if err != nil {
  384. return err
  385. }
  386. c.AthenaBucketName = a.AthenaBucketName
  387. c.AthenaRegion = a.AthenaRegion
  388. c.AthenaDatabase = a.AthenaDatabase
  389. c.AthenaTable = a.AthenaTable
  390. c.ServiceKeyName = a.ServiceKeyName
  391. if a.ServiceKeySecret != "" {
  392. c.ServiceKeySecret = a.ServiceKeySecret
  393. }
  394. if a.MasterPayerARN != "" {
  395. c.MasterPayerARN = a.MasterPayerARN
  396. }
  397. c.AthenaProjectID = a.AccountID
  398. } else {
  399. a := make(map[string]interface{})
  400. err := json.NewDecoder(r).Decode(&a)
  401. if err != nil {
  402. return err
  403. }
  404. for k, v := range a {
  405. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  406. vstr, ok := v.(string)
  407. if ok {
  408. err := SetCustomPricingField(c, kUpper, vstr)
  409. if err != nil {
  410. return err
  411. }
  412. } else {
  413. return fmt.Errorf("type error while updating config for %s", kUpper)
  414. }
  415. }
  416. }
  417. if env.IsRemoteEnabled() {
  418. err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
  419. if err != nil {
  420. return err
  421. }
  422. }
  423. return nil
  424. })
  425. }
  426. type awsKey struct {
  427. SpotLabelName string
  428. SpotLabelValue string
  429. Labels map[string]string
  430. ProviderID string
  431. }
  432. func (k *awsKey) GPUType() string {
  433. return ""
  434. }
  435. func (k *awsKey) ID() string {
  436. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  437. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  438. if matchNum == 2 {
  439. return group
  440. }
  441. }
  442. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  443. return ""
  444. }
  445. func (k *awsKey) Features() string {
  446. instanceType, _ := util.GetInstanceType(k.Labels)
  447. operatingSystem, _ := util.GetOperatingSystem(k.Labels)
  448. region, _ := util.GetRegion(k.Labels)
  449. key := region + "," + instanceType + "," + operatingSystem
  450. usageType := PreemptibleType
  451. spotKey := key + "," + usageType
  452. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  453. return spotKey
  454. }
  455. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  456. return spotKey
  457. }
  458. return key
  459. }
  460. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  461. pricing, ok := aws.Pricing[pvk.Features()]
  462. if !ok {
  463. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  464. return &PV{}, nil
  465. }
  466. return pricing.PV, nil
  467. }
  468. type awsPVKey struct {
  469. Labels map[string]string
  470. StorageClassParameters map[string]string
  471. StorageClassName string
  472. Name string
  473. DefaultRegion string
  474. ProviderID string
  475. }
  476. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  477. providerID := ""
  478. if pv.Spec.AWSElasticBlockStore != nil {
  479. providerID = pv.Spec.AWSElasticBlockStore.VolumeID
  480. } else if pv.Spec.CSI != nil {
  481. providerID = pv.Spec.CSI.VolumeHandle
  482. }
  483. return &awsPVKey{
  484. Labels: pv.Labels,
  485. StorageClassName: pv.Spec.StorageClassName,
  486. StorageClassParameters: parameters,
  487. Name: pv.Name,
  488. DefaultRegion: defaultRegion,
  489. ProviderID: providerID,
  490. }
  491. }
  492. func (key *awsPVKey) ID() string {
  493. return key.ProviderID
  494. }
  495. func (key *awsPVKey) GetStorageClass() string {
  496. return key.StorageClassName
  497. }
  498. func (key *awsPVKey) Features() string {
  499. storageClass := key.StorageClassParameters["type"]
  500. if storageClass == "standard" {
  501. storageClass = "gp2"
  502. }
  503. // Storage class names are generally EBS volume types (gp2)
  504. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  505. // Converts between the 2
  506. region, ok := util.GetRegion(key.Labels)
  507. if !ok {
  508. region = key.DefaultRegion
  509. }
  510. class, ok := volTypes[storageClass]
  511. if !ok {
  512. klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  513. }
  514. return region + "," + class
  515. }
  516. // GetKey maps node labels to information needed to retrieve pricing data
  517. func (aws *AWS) GetKey(labels map[string]string, n *v1.Node) Key {
  518. return &awsKey{
  519. SpotLabelName: aws.SpotLabelName,
  520. SpotLabelValue: aws.SpotLabelValue,
  521. Labels: labels,
  522. ProviderID: labels["providerID"],
  523. }
  524. }
  525. func (aws *AWS) isPreemptible(key string) bool {
  526. s := strings.Split(key, ",")
  527. if len(s) == 4 && s[3] == PreemptibleType {
  528. return true
  529. }
  530. return false
  531. }
  532. func (aws *AWS) ClusterManagementPricing() (string, float64, error) {
  533. return aws.clusterProvisioner, aws.clusterManagementPrice, nil
  534. }
  535. // Use the pricing data from the current region. Fall back to using all region data if needed.
  536. func (aws *AWS) getRegionPricing(nodeList []*v1.Node) (*http.Response, string, error) {
  537. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/"
  538. region := ""
  539. multiregion := false
  540. for _, n := range nodeList {
  541. labels := n.GetLabels()
  542. currentNodeRegion := ""
  543. if r, ok := util.GetRegion(labels); ok {
  544. currentNodeRegion = r
  545. // Switch to Chinese endpoint for regions with the Chinese prefix
  546. if strings.HasPrefix(currentNodeRegion, "cn-") {
  547. pricingURL = "https://pricing.cn-north-1.amazonaws.com.cn/offers/v1.0/cn/AmazonEC2/current/"
  548. }
  549. } else {
  550. multiregion = true // We weren't able to detect the node's region, so pull all data.
  551. break
  552. }
  553. if region == "" { // We haven't set a region yet
  554. region = currentNodeRegion
  555. } else if region != "" && currentNodeRegion != region { // If two nodes have different regions here, we'll need to fetch all pricing data.
  556. multiregion = true
  557. break
  558. }
  559. }
  560. // Chinese multiregion endpoint only contains data for Chinese regions and Chinese regions are excluded from other endpoint
  561. if region != "" && !multiregion {
  562. pricingURL += region + "/"
  563. }
  564. pricingURL += "index.json"
  565. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  566. resp, err := http.Get(pricingURL)
  567. if err != nil {
  568. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  569. return nil, pricingURL, err
  570. }
  571. return resp, pricingURL, err
  572. }
  573. // DownloadPricingData fetches data from the AWS Pricing API
  574. func (aws *AWS) DownloadPricingData() error {
  575. aws.DownloadPricingDataLock.Lock()
  576. defer aws.DownloadPricingDataLock.Unlock()
  577. if aws.ServiceAccountChecks == nil {
  578. aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  579. }
  580. c, err := aws.Config.GetCustomPricingData()
  581. if err != nil {
  582. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  583. }
  584. aws.BaseCPUPrice = c.CPU
  585. aws.BaseRAMPrice = c.RAM
  586. aws.BaseGPUPrice = c.GPU
  587. aws.BaseSpotCPUPrice = c.SpotCPU
  588. aws.BaseSpotRAMPrice = c.SpotRAM
  589. aws.BaseSpotGPUPrice = c.SpotGPU
  590. aws.SpotLabelName = c.SpotLabel
  591. aws.SpotLabelValue = c.SpotLabelValue
  592. aws.SpotDataBucket = c.SpotDataBucket
  593. aws.SpotDataPrefix = c.SpotDataPrefix
  594. aws.ProjectID = c.ProjectID
  595. aws.SpotDataRegion = c.SpotDataRegion
  596. aws.ConfigureAuthWith(c) // load aws authentication from configuration or secret
  597. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  598. klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  599. }
  600. nodeList := aws.Clientset.GetAllNodes()
  601. inputkeys := make(map[string]bool)
  602. for _, n := range nodeList {
  603. if _, ok := n.Labels["eks.amazonaws.com/nodegroup"]; ok {
  604. aws.clusterManagementPrice = 0.10
  605. aws.clusterProvisioner = "EKS"
  606. } else if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  607. aws.clusterProvisioner = "KOPS"
  608. }
  609. labels := n.GetObjectMeta().GetLabels()
  610. key := aws.GetKey(labels, n)
  611. inputkeys[key.Features()] = true
  612. }
  613. pvList := aws.Clientset.GetAllPersistentVolumes()
  614. storageClasses := aws.Clientset.GetAllStorageClasses()
  615. storageClassMap := make(map[string]map[string]string)
  616. for _, storageClass := range storageClasses {
  617. params := storageClass.Parameters
  618. storageClassMap[storageClass.ObjectMeta.Name] = params
  619. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  620. storageClassMap["default"] = params
  621. storageClassMap[""] = params
  622. }
  623. }
  624. pvkeys := make(map[string]PVKey)
  625. for _, pv := range pvList {
  626. params, ok := storageClassMap[pv.Spec.StorageClassName]
  627. if !ok {
  628. klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  629. continue
  630. }
  631. key := aws.GetPVKey(pv, params, "")
  632. pvkeys[key.Features()] = key
  633. }
  634. // RIDataRunning establishes the existance of the goroutine. Since it's possible we
  635. // run multiple downloads, we don't want to create multiple go routines if one already exists
  636. if !aws.RIDataRunning && c.AthenaBucketName != "" {
  637. err = aws.GetReservationDataFromAthena() // Block until one run has completed.
  638. if err != nil {
  639. klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
  640. } else { // If we make one successful run, check on new reservation data every hour
  641. go func() {
  642. defer errors.HandlePanic()
  643. aws.RIDataRunning = true
  644. for {
  645. klog.Infof("Reserved Instance watcher running... next update in 1h")
  646. time.Sleep(time.Hour)
  647. err := aws.GetReservationDataFromAthena()
  648. if err != nil {
  649. klog.Infof("Error updating RI data: %s", err.Error())
  650. }
  651. }
  652. }()
  653. }
  654. }
  655. if !aws.SavingsPlanDataRunning && c.AthenaBucketName != "" {
  656. err = aws.GetSavingsPlanDataFromAthena()
  657. if err != nil {
  658. klog.V(1).Infof("Failed to lookup savings plan data: %s", err.Error())
  659. } else {
  660. go func() {
  661. defer errors.HandlePanic()
  662. aws.SavingsPlanDataRunning = true
  663. for {
  664. klog.Infof("Savings Plan watcher running... next update in 1h")
  665. time.Sleep(time.Hour)
  666. err := aws.GetSavingsPlanDataFromAthena()
  667. if err != nil {
  668. klog.Infof("Error updating Savings Plan data: %s", err.Error())
  669. }
  670. }
  671. }()
  672. }
  673. }
  674. aws.Pricing = make(map[string]*AWSProductTerms)
  675. aws.ValidPricingKeys = make(map[string]bool)
  676. skusToKeys := make(map[string]string)
  677. resp, pricingURL, err := aws.getRegionPricing(nodeList)
  678. if err != nil {
  679. return err
  680. }
  681. dec := json.NewDecoder(resp.Body)
  682. for {
  683. t, err := dec.Token()
  684. if err == io.EOF {
  685. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  686. break
  687. } else if err != nil {
  688. klog.V(2).Infof("error parsing response json %v", resp.Body)
  689. break
  690. }
  691. if t == "products" {
  692. _, err := dec.Token() // this should parse the opening "{""
  693. if err != nil {
  694. return err
  695. }
  696. for dec.More() {
  697. _, err := dec.Token() // the sku token
  698. if err != nil {
  699. return err
  700. }
  701. product := &AWSProduct{}
  702. err = dec.Decode(&product)
  703. if err != nil {
  704. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  705. break
  706. }
  707. if product.Attributes.PreInstalledSw == "NA" &&
  708. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) &&
  709. product.Attributes.CapacityStatus == "Used" {
  710. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  711. spotKey := key + ",preemptible"
  712. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  713. productTerms := &AWSProductTerms{
  714. Sku: product.Sku,
  715. Memory: product.Attributes.Memory,
  716. Storage: product.Attributes.Storage,
  717. VCpu: product.Attributes.VCpu,
  718. GPU: product.Attributes.GPU,
  719. }
  720. aws.Pricing[key] = productTerms
  721. aws.Pricing[spotKey] = productTerms
  722. skusToKeys[product.Sku] = key
  723. }
  724. aws.ValidPricingKeys[key] = true
  725. aws.ValidPricingKeys[spotKey] = true
  726. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  727. // UsageTypes may be prefixed with a region code - we're removing this when using
  728. // volTypes to keep lookups generic
  729. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  730. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  731. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  732. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  733. spotKey := key + ",preemptible"
  734. pv := &PV{
  735. Class: volTypes[usageTypeNoRegion],
  736. Region: locationToRegion[product.Attributes.Location],
  737. }
  738. productTerms := &AWSProductTerms{
  739. Sku: product.Sku,
  740. PV: pv,
  741. }
  742. aws.Pricing[key] = productTerms
  743. aws.Pricing[spotKey] = productTerms
  744. skusToKeys[product.Sku] = key
  745. aws.ValidPricingKeys[key] = true
  746. aws.ValidPricingKeys[spotKey] = true
  747. }
  748. }
  749. }
  750. if t == "terms" {
  751. _, err := dec.Token() // this should parse the opening "{""
  752. if err != nil {
  753. return err
  754. }
  755. termType, err := dec.Token()
  756. if err != nil {
  757. return err
  758. }
  759. if termType == "OnDemand" {
  760. _, err := dec.Token()
  761. if err != nil { // again, should parse an opening "{"
  762. return err
  763. }
  764. for dec.More() {
  765. sku, err := dec.Token()
  766. if err != nil {
  767. return err
  768. }
  769. _, err = dec.Token() // another opening "{"
  770. if err != nil {
  771. return err
  772. }
  773. skuOnDemand, err := dec.Token()
  774. if err != nil {
  775. return err
  776. }
  777. offerTerm := &AWSOfferTerm{}
  778. err = dec.Decode(&offerTerm)
  779. if err != nil {
  780. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  781. }
  782. key, ok := skusToKeys[sku.(string)]
  783. spotKey := key + ",preemptible"
  784. if ok {
  785. aws.Pricing[key].OnDemand = offerTerm
  786. aws.Pricing[spotKey].OnDemand = offerTerm
  787. var cost string
  788. if sku.(string)+OnDemandRateCode == skuOnDemand {
  789. cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  790. } else if sku.(string)+OnDemandRateCodeCn == skuOnDemand {
  791. cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCodeCn+HourlyRateCodeCn].PricePerUnit.CNY
  792. }
  793. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  794. // If the specific UsageType is the per IO cost used on io1 volumes
  795. // we need to add the per IO cost to the io1 PV cost
  796. // Add the per IO cost to the PV object for the io1 volume type
  797. aws.Pricing[key].PV.CostPerIO = cost
  798. } else if strings.Contains(key, "EBS:Volume") {
  799. // If volume, we need to get hourly cost and add it to the PV object
  800. costFloat, _ := strconv.ParseFloat(cost, 64)
  801. hourlyPrice := costFloat / 730
  802. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  803. }
  804. }
  805. _, err = dec.Token()
  806. if err != nil {
  807. return err
  808. }
  809. }
  810. _, err = dec.Token()
  811. if err != nil {
  812. return err
  813. }
  814. }
  815. }
  816. }
  817. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  818. // Always run spot pricing refresh when performing download
  819. aws.refreshSpotPricing(true)
  820. // Only start a single refresh goroutine
  821. if !aws.SpotRefreshRunning {
  822. aws.SpotRefreshRunning = true
  823. go func() {
  824. defer errors.HandlePanic()
  825. for {
  826. klog.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
  827. time.Sleep(SpotRefreshDuration)
  828. // Reoccurring refresh checks update times
  829. aws.refreshSpotPricing(false)
  830. }
  831. }()
  832. }
  833. return nil
  834. }
  835. func (aws *AWS) refreshSpotPricing(force bool) {
  836. aws.SpotPricingLock.Lock()
  837. defer aws.SpotPricingLock.Unlock()
  838. now := time.Now().UTC()
  839. updateTime := now.Add(-SpotRefreshDuration)
  840. // Return if there was an update time set and an hour hasn't elapsed
  841. if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
  842. return
  843. }
  844. sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion)
  845. if err != nil {
  846. klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
  847. aws.SpotPricingError = err
  848. return
  849. }
  850. aws.SpotPricingError = nil
  851. // update time last updated
  852. aws.SpotPricingUpdatedAt = &now
  853. aws.SpotPricingByInstanceID = sp
  854. }
  855. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  856. func (aws *AWS) NetworkPricing() (*Network, error) {
  857. cpricing, err := aws.Config.GetCustomPricingData()
  858. if err != nil {
  859. return nil, err
  860. }
  861. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  862. if err != nil {
  863. return nil, err
  864. }
  865. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  866. if err != nil {
  867. return nil, err
  868. }
  869. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  870. if err != nil {
  871. return nil, err
  872. }
  873. return &Network{
  874. ZoneNetworkEgressCost: znec,
  875. RegionNetworkEgressCost: rnec,
  876. InternetNetworkEgressCost: inec,
  877. }, nil
  878. }
  879. func (aws *AWS) LoadBalancerPricing() (*LoadBalancer, error) {
  880. fffrc := 0.025
  881. afrc := 0.010
  882. lbidc := 0.008
  883. numForwardingRules := 1.0
  884. dataIngressGB := 0.0
  885. var totalCost float64
  886. if numForwardingRules < 5 {
  887. totalCost = fffrc*numForwardingRules + lbidc*dataIngressGB
  888. } else {
  889. totalCost = fffrc*5 + afrc*(numForwardingRules-5) + lbidc*dataIngressGB
  890. }
  891. return &LoadBalancer{
  892. Cost: totalCost,
  893. }, nil
  894. }
  895. // AllNodePricing returns all the billing data fetched.
  896. func (aws *AWS) AllNodePricing() (interface{}, error) {
  897. aws.DownloadPricingDataLock.RLock()
  898. defer aws.DownloadPricingDataLock.RUnlock()
  899. return aws.Pricing, nil
  900. }
  901. func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
  902. aws.SpotPricingLock.RLock()
  903. defer aws.SpotPricingLock.RUnlock()
  904. info, ok := aws.SpotPricingByInstanceID[instanceID]
  905. return info, ok
  906. }
  907. func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
  908. aws.RIDataLock.RLock()
  909. defer aws.RIDataLock.RUnlock()
  910. data, ok := aws.RIPricingByInstanceID[instanceID]
  911. return data, ok
  912. }
  913. func (aws *AWS) savingsPlanPricing(instanceID string) (*SavingsPlanData, bool) {
  914. aws.SavingsPlanDataLock.RLock()
  915. defer aws.SavingsPlanDataLock.RUnlock()
  916. data, ok := aws.SavingsPlanDataByInstanceID[instanceID]
  917. return data, ok
  918. }
  919. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  920. key := k.Features()
  921. if spotInfo, ok := aws.spotPricing(k.ID()); ok {
  922. var spotcost string
  923. log.DedupedInfof(5, "Looking up spot data from feed for node %s", k.ID())
  924. arr := strings.Split(spotInfo.Charge, " ")
  925. if len(arr) == 2 {
  926. spotcost = arr[0]
  927. } else {
  928. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  929. }
  930. return &Node{
  931. Cost: spotcost,
  932. VCPU: terms.VCpu,
  933. RAM: terms.Memory,
  934. GPU: terms.GPU,
  935. Storage: terms.Storage,
  936. BaseCPUPrice: aws.BaseCPUPrice,
  937. BaseRAMPrice: aws.BaseRAMPrice,
  938. BaseGPUPrice: aws.BaseGPUPrice,
  939. UsageType: PreemptibleType,
  940. }, nil
  941. } else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
  942. log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
  943. return &Node{
  944. VCPU: terms.VCpu,
  945. VCPUCost: aws.BaseSpotCPUPrice,
  946. RAM: terms.Memory,
  947. GPU: terms.GPU,
  948. Storage: terms.Storage,
  949. BaseCPUPrice: aws.BaseCPUPrice,
  950. BaseRAMPrice: aws.BaseRAMPrice,
  951. BaseGPUPrice: aws.BaseGPUPrice,
  952. UsageType: PreemptibleType,
  953. }, nil
  954. } else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
  955. strCost := fmt.Sprintf("%f", sp.EffectiveCost)
  956. return &Node{
  957. Cost: strCost,
  958. VCPU: terms.VCpu,
  959. RAM: terms.Memory,
  960. GPU: terms.GPU,
  961. Storage: terms.Storage,
  962. BaseCPUPrice: aws.BaseCPUPrice,
  963. BaseRAMPrice: aws.BaseRAMPrice,
  964. BaseGPUPrice: aws.BaseGPUPrice,
  965. UsageType: usageType,
  966. }, nil
  967. } else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
  968. strCost := fmt.Sprintf("%f", ri.EffectiveCost)
  969. return &Node{
  970. Cost: strCost,
  971. VCPU: terms.VCpu,
  972. RAM: terms.Memory,
  973. GPU: terms.GPU,
  974. Storage: terms.Storage,
  975. BaseCPUPrice: aws.BaseCPUPrice,
  976. BaseRAMPrice: aws.BaseRAMPrice,
  977. BaseGPUPrice: aws.BaseGPUPrice,
  978. UsageType: usageType,
  979. }, nil
  980. }
  981. var cost string
  982. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  983. if ok {
  984. cost = c.PricePerUnit.USD
  985. } else {
  986. // Check for Chinese pricing before throwing error
  987. c, ok = terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCodeCn+HourlyRateCodeCn]
  988. if ok {
  989. cost = c.PricePerUnit.CNY
  990. } else {
  991. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  992. }
  993. }
  994. return &Node{
  995. Cost: cost,
  996. VCPU: terms.VCpu,
  997. RAM: terms.Memory,
  998. GPU: terms.GPU,
  999. Storage: terms.Storage,
  1000. BaseCPUPrice: aws.BaseCPUPrice,
  1001. BaseRAMPrice: aws.BaseRAMPrice,
  1002. BaseGPUPrice: aws.BaseGPUPrice,
  1003. UsageType: usageType,
  1004. }, nil
  1005. }
  1006. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  1007. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  1008. aws.DownloadPricingDataLock.RLock()
  1009. defer aws.DownloadPricingDataLock.RUnlock()
  1010. key := k.Features()
  1011. usageType := "ondemand"
  1012. if aws.isPreemptible(key) {
  1013. usageType = PreemptibleType
  1014. }
  1015. terms, ok := aws.Pricing[key]
  1016. if ok {
  1017. return aws.createNode(terms, usageType, k)
  1018. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  1019. aws.DownloadPricingDataLock.RUnlock()
  1020. err := aws.DownloadPricingData()
  1021. aws.DownloadPricingDataLock.RLock()
  1022. if err != nil {
  1023. return &Node{
  1024. Cost: aws.BaseCPUPrice,
  1025. BaseCPUPrice: aws.BaseCPUPrice,
  1026. BaseRAMPrice: aws.BaseRAMPrice,
  1027. BaseGPUPrice: aws.BaseGPUPrice,
  1028. UsageType: usageType,
  1029. UsesBaseCPUPrice: true,
  1030. }, err
  1031. }
  1032. terms, termsOk := aws.Pricing[key]
  1033. if !termsOk {
  1034. return &Node{
  1035. Cost: aws.BaseCPUPrice,
  1036. BaseCPUPrice: aws.BaseCPUPrice,
  1037. BaseRAMPrice: aws.BaseRAMPrice,
  1038. BaseGPUPrice: aws.BaseGPUPrice,
  1039. UsageType: usageType,
  1040. UsesBaseCPUPrice: true,
  1041. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  1042. }
  1043. return aws.createNode(terms, usageType, k)
  1044. } else { // Fall back to base pricing if we can't find the key. Base pricing is handled at the costmodel level.
  1045. return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
  1046. }
  1047. }
  1048. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  1049. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  1050. defaultClusterName := "AWS Cluster #1"
  1051. c, err := awsProvider.GetConfig()
  1052. if err != nil {
  1053. return nil, err
  1054. }
  1055. remoteEnabled := env.IsRemoteEnabled()
  1056. if c.ClusterName != "" {
  1057. m := make(map[string]string)
  1058. m["name"] = c.ClusterName
  1059. m["provider"] = "AWS"
  1060. m["account"] = c.AthenaProjectID // this value requires configuration but is unavailable else where
  1061. m["region"] = awsProvider.clusterRegion
  1062. m["id"] = env.GetClusterID()
  1063. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  1064. m["provisioner"] = awsProvider.clusterProvisioner
  1065. return m, nil
  1066. }
  1067. makeStructure := func(clusterName string) (map[string]string, error) {
  1068. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  1069. m := make(map[string]string)
  1070. m["name"] = clusterName
  1071. m["provider"] = "AWS"
  1072. m["account"] = c.AthenaProjectID // this value requires configuration but is unavailable else where
  1073. m["region"] = awsProvider.clusterRegion
  1074. m["id"] = env.GetClusterID()
  1075. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  1076. return m, nil
  1077. }
  1078. maybeClusterId := env.GetAWSClusterID()
  1079. if len(maybeClusterId) != 0 {
  1080. return makeStructure(maybeClusterId)
  1081. }
  1082. // TODO: This should be cached, it can take a long time to hit the API
  1083. //provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  1084. //clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  1085. //klog.Infof("nodelist get here %s", time.Now())
  1086. //nodeList := awsProvider.Clientset.GetAllNodes()
  1087. //klog.Infof("nodelist done here %s", time.Now())
  1088. /*for _, n := range nodeList {
  1089. region := ""
  1090. instanceId := ""
  1091. providerId := n.Spec.ProviderID
  1092. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  1093. if matchNum == 1 {
  1094. region = group
  1095. } else if matchNum == 2 {
  1096. instanceId = group
  1097. }
  1098. }
  1099. if len(instanceId) == 0 {
  1100. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  1101. continue
  1102. }
  1103. c := &aws.Config{
  1104. Region: aws.String(region),
  1105. }
  1106. s := session.Must(session.NewSession(c))
  1107. ec2Svc := ec2.New(s)
  1108. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  1109. InstanceIds: []*string{
  1110. aws.String(instanceId),
  1111. },
  1112. })
  1113. if diErr != nil {
  1114. klog.Infof("Error describing instances: %s", diErr)
  1115. continue
  1116. }
  1117. if len(di.Reservations) != 1 {
  1118. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  1119. continue
  1120. }
  1121. res := di.Reservations[0]
  1122. if len(res.Instances) != 1 {
  1123. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  1124. continue
  1125. }
  1126. inst := res.Instances[0]
  1127. for _, tag := range inst.Tags {
  1128. tagKey := *tag.Key
  1129. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  1130. if matchNum != 1 {
  1131. continue
  1132. }
  1133. return makeStructure(group)
  1134. }
  1135. }
  1136. }*/
  1137. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", env.AWSClusterIDEnvVar)
  1138. return makeStructure(defaultClusterName)
  1139. }
  1140. // updates the authentication to the latest values (via config or secret)
  1141. func (aws *AWS) ConfigureAuth() error {
  1142. c, err := aws.Config.GetCustomPricingData()
  1143. if err != nil {
  1144. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  1145. }
  1146. return aws.ConfigureAuthWith(c)
  1147. }
  1148. // updates the authentication to the latest values (via config or secret)
  1149. func (aws *AWS) ConfigureAuthWith(config *CustomPricing) error {
  1150. accessKeyID, accessKeySecret := aws.getAWSAuth(false, config)
  1151. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1152. err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
  1153. if err != nil {
  1154. return err
  1155. }
  1156. err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
  1157. if err != nil {
  1158. return err
  1159. }
  1160. }
  1161. return nil
  1162. }
  1163. // Gets the aws key id and secret
  1164. func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
  1165. if aws.ServiceAccountChecks == nil { // safety in case checks don't exist
  1166. aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  1167. }
  1168. // 1. Check config values first (set from frontend UI)
  1169. if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
  1170. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1171. Message: "AWS ServiceKey exists",
  1172. Status: true,
  1173. }
  1174. return cp.ServiceKeyName, cp.ServiceKeySecret
  1175. }
  1176. // 2. Check for secret
  1177. s, _ := aws.loadAWSAuthSecret(forceReload)
  1178. if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
  1179. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1180. Message: "AWS ServiceKey exists",
  1181. Status: true,
  1182. }
  1183. return s.AccessKeyID, s.SecretAccessKey
  1184. }
  1185. // 3. Fall back to env vars
  1186. if env.GetAWSAccessKeyID() == "" || env.GetAWSAccessKeyID() == "" {
  1187. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1188. Message: "AWS ServiceKey exists",
  1189. Status: false,
  1190. }
  1191. } else {
  1192. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1193. Message: "AWS ServiceKey exists",
  1194. Status: true,
  1195. }
  1196. }
  1197. return env.GetAWSAccessKeyID(), env.GetAWSAccessKeySecret()
  1198. }
  1199. // Load once and cache the result (even on failure). This is an install time secret, so
  1200. // we don't expect the secret to change. If it does, however, we can force reload using
  1201. // the input parameter.
  1202. func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
  1203. if !force && loadedAWSSecret {
  1204. return awsSecret, nil
  1205. }
  1206. loadedAWSSecret = true
  1207. exists, err := fileutil.FileExists(authSecretPath)
  1208. if !exists || err != nil {
  1209. return nil, fmt.Errorf("Failed to locate service account file: %s", authSecretPath)
  1210. }
  1211. result, err := ioutil.ReadFile(authSecretPath)
  1212. if err != nil {
  1213. return nil, err
  1214. }
  1215. var ak AWSAccessKey
  1216. err = json.Unmarshal(result, &ak)
  1217. if err != nil {
  1218. return nil, err
  1219. }
  1220. awsSecret = &ak
  1221. return awsSecret, nil
  1222. }
  1223. func getClusterConfig(ccFile string) (map[string]string, error) {
  1224. clusterConfig, err := os.Open(ccFile)
  1225. if err != nil {
  1226. return nil, err
  1227. }
  1228. defer clusterConfig.Close()
  1229. b, err := ioutil.ReadAll(clusterConfig)
  1230. if err != nil {
  1231. return nil, err
  1232. }
  1233. var clusterConf map[string]string
  1234. err = json.Unmarshal([]byte(b), &clusterConf)
  1235. if err != nil {
  1236. return nil, err
  1237. }
  1238. return clusterConf, nil
  1239. }
  1240. func (a *AWS) getAddressesForRegion(region string) (*ec2.DescribeAddressesOutput, error) {
  1241. sess, err := session.NewSession(&aws.Config{
  1242. Region: aws.String(region),
  1243. Credentials: credentials.NewEnvCredentials(),
  1244. })
  1245. if err != nil {
  1246. return nil, err
  1247. }
  1248. ec2Svc := ec2.New(sess)
  1249. return ec2Svc.DescribeAddresses(&ec2.DescribeAddressesInput{})
  1250. }
  1251. func (a *AWS) GetAddresses() ([]byte, error) {
  1252. a.ConfigureAuth() // load authentication data into env vars
  1253. addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
  1254. errorCh := make(chan error, len(awsRegions))
  1255. var wg sync.WaitGroup
  1256. wg.Add(len(awsRegions))
  1257. // Get volumes from each AWS region
  1258. for _, r := range awsRegions {
  1259. // Fetch IP address response and send results and errors to their
  1260. // respective channels
  1261. go func(region string) {
  1262. defer wg.Done()
  1263. defer errors.HandlePanic()
  1264. // Query for first page of volume results
  1265. resp, err := a.getAddressesForRegion(region)
  1266. if err != nil {
  1267. if aerr, ok := err.(awserr.Error); ok {
  1268. switch aerr.Code() {
  1269. default:
  1270. errorCh <- aerr
  1271. }
  1272. return
  1273. } else {
  1274. errorCh <- err
  1275. return
  1276. }
  1277. }
  1278. addressCh <- resp
  1279. }(r)
  1280. }
  1281. // Close the result channels after everything has been sent
  1282. go func() {
  1283. defer errors.HandlePanic()
  1284. wg.Wait()
  1285. close(errorCh)
  1286. close(addressCh)
  1287. }()
  1288. addresses := []*ec2.Address{}
  1289. for adds := range addressCh {
  1290. addresses = append(addresses, adds.Addresses...)
  1291. }
  1292. errors := []error{}
  1293. for err := range errorCh {
  1294. log.DedupedWarningf(5, "unable to get addresses: %s", err)
  1295. errors = append(errors, err)
  1296. }
  1297. // Return error if no addresses are returned
  1298. if len(errors) > 0 && len(addresses) == 0 {
  1299. return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errors), errors)
  1300. }
  1301. // Format the response this way to match the JSON-encoded formatting of a single response
  1302. // from DescribeAddresss, so that consumers can always expect AWS disk responses to have
  1303. // a "Addresss" key at the top level.
  1304. return json.Marshal(map[string][]*ec2.Address{
  1305. "Addresses": addresses,
  1306. })
  1307. }
  1308. func (a *AWS) getDisksForRegion(region string, maxResults int64, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
  1309. sess, err := session.NewSession(&aws.Config{
  1310. Region: aws.String(region),
  1311. Credentials: credentials.NewEnvCredentials(),
  1312. })
  1313. if err != nil {
  1314. return nil, err
  1315. }
  1316. ec2Svc := ec2.New(sess)
  1317. return ec2Svc.DescribeVolumes(&ec2.DescribeVolumesInput{
  1318. MaxResults: &maxResults,
  1319. NextToken: nextToken,
  1320. })
  1321. }
  1322. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  1323. func (a *AWS) GetDisks() ([]byte, error) {
  1324. a.ConfigureAuth() // load authentication data into env vars
  1325. volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
  1326. errorCh := make(chan error, len(awsRegions))
  1327. var wg sync.WaitGroup
  1328. wg.Add(len(awsRegions))
  1329. // Get volumes from each AWS region
  1330. for _, r := range awsRegions {
  1331. // Fetch volume response and send results and errors to their
  1332. // respective channels
  1333. go func(region string) {
  1334. defer wg.Done()
  1335. defer errors.HandlePanic()
  1336. // Query for first page of volume results
  1337. resp, err := a.getDisksForRegion(region, 1000, nil)
  1338. if err != nil {
  1339. if aerr, ok := err.(awserr.Error); ok {
  1340. switch aerr.Code() {
  1341. default:
  1342. errorCh <- aerr
  1343. }
  1344. return
  1345. } else {
  1346. errorCh <- err
  1347. return
  1348. }
  1349. }
  1350. volumeCh <- resp
  1351. // A NextToken indicates more pages of results. Keep querying
  1352. // until all pages are retrieved.
  1353. for resp.NextToken != nil {
  1354. resp, err = a.getDisksForRegion(region, 100, resp.NextToken)
  1355. if err != nil {
  1356. if aerr, ok := err.(awserr.Error); ok {
  1357. switch aerr.Code() {
  1358. default:
  1359. errorCh <- aerr
  1360. }
  1361. return
  1362. } else {
  1363. errorCh <- err
  1364. return
  1365. }
  1366. }
  1367. volumeCh <- resp
  1368. }
  1369. }(r)
  1370. }
  1371. // Close the result channels after everything has been sent
  1372. go func() {
  1373. defer errors.HandlePanic()
  1374. wg.Wait()
  1375. close(errorCh)
  1376. close(volumeCh)
  1377. }()
  1378. volumes := []*ec2.Volume{}
  1379. for vols := range volumeCh {
  1380. volumes = append(volumes, vols.Volumes...)
  1381. }
  1382. errors := []error{}
  1383. for err := range errorCh {
  1384. log.DedupedWarningf(5, "unable to get disks: %s", err)
  1385. errors = append(errors, err)
  1386. }
  1387. // Return error if no volumes are returned
  1388. if len(errors) > 0 && len(volumes) == 0 {
  1389. return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errors), errors)
  1390. }
  1391. // Format the response this way to match the JSON-encoded formatting of a single response
  1392. // from DescribeVolumes, so that consumers can always expect AWS disk responses to have
  1393. // a "Volumes" key at the top level.
  1394. return json.Marshal(map[string][]*ec2.Volume{
  1395. "Volumes": volumes,
  1396. })
  1397. }
  1398. func generateAWSGroupBy(lastIdx int) string {
  1399. sequence := []string{}
  1400. for i := 1; i < lastIdx+1; i++ {
  1401. sequence = append(sequence, strconv.Itoa(i))
  1402. }
  1403. return strings.Join(sequence, ",")
  1404. }
  1405. func (a *AWS) QueryAthenaPaginated(query string) (*athena.GetQueryResultsInput, *athena.Athena, error) {
  1406. customPricing, err := a.GetConfig()
  1407. if err != nil {
  1408. return nil, nil, err
  1409. }
  1410. a.ConfigureAuthWith(customPricing)
  1411. region := aws.String(customPricing.AthenaRegion)
  1412. resultsBucket := customPricing.AthenaBucketName
  1413. database := customPricing.AthenaDatabase
  1414. c := &aws.Config{
  1415. Region: region,
  1416. STSRegionalEndpoint: endpoints.RegionalSTSEndpoint,
  1417. }
  1418. s := session.Must(session.NewSession(c))
  1419. svc := athena.New(s)
  1420. if customPricing.MasterPayerARN != "" {
  1421. creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
  1422. svc = athena.New(s, &aws.Config{
  1423. Region: region,
  1424. Credentials: creds,
  1425. })
  1426. }
  1427. var e athena.StartQueryExecutionInput
  1428. var r athena.ResultConfiguration
  1429. r.SetOutputLocation(resultsBucket)
  1430. e.SetResultConfiguration(&r)
  1431. e.SetQueryString(query)
  1432. var q athena.QueryExecutionContext
  1433. q.SetDatabase(database)
  1434. e.SetQueryExecutionContext(&q)
  1435. res, err := svc.StartQueryExecution(&e)
  1436. if err != nil {
  1437. return nil, svc, err
  1438. }
  1439. klog.V(2).Infof("StartQueryExecution result:")
  1440. klog.V(2).Infof(res.GoString())
  1441. var qri athena.GetQueryExecutionInput
  1442. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1443. var qrop *athena.GetQueryExecutionOutput
  1444. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1445. for {
  1446. qrop, err = svc.GetQueryExecution(&qri)
  1447. if err != nil {
  1448. return nil, svc, err
  1449. }
  1450. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1451. break
  1452. }
  1453. time.Sleep(duration)
  1454. }
  1455. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1456. var ip athena.GetQueryResultsInput
  1457. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1458. return &ip, svc, nil
  1459. } else {
  1460. return nil, svc, fmt.Errorf("No results available for %s", query)
  1461. }
  1462. }
  1463. func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
  1464. customPricing, err := a.GetConfig()
  1465. if err != nil {
  1466. return nil, err
  1467. }
  1468. a.ConfigureAuthWith(customPricing) // load aws authentication from configuration or secret
  1469. region := aws.String(customPricing.AthenaRegion)
  1470. resultsBucket := customPricing.AthenaBucketName
  1471. database := customPricing.AthenaDatabase
  1472. c := &aws.Config{
  1473. Region: region,
  1474. STSRegionalEndpoint: endpoints.RegionalSTSEndpoint,
  1475. }
  1476. s := session.Must(session.NewSession(c))
  1477. svc := athena.New(s)
  1478. if customPricing.MasterPayerARN != "" {
  1479. creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
  1480. svc = athena.New(s, &aws.Config{
  1481. Region: region,
  1482. Credentials: creds,
  1483. })
  1484. }
  1485. var e athena.StartQueryExecutionInput
  1486. var r athena.ResultConfiguration
  1487. r.SetOutputLocation(resultsBucket)
  1488. e.SetResultConfiguration(&r)
  1489. e.SetQueryString(query)
  1490. var q athena.QueryExecutionContext
  1491. q.SetDatabase(database)
  1492. e.SetQueryExecutionContext(&q)
  1493. res, err := svc.StartQueryExecution(&e)
  1494. if err != nil {
  1495. return nil, err
  1496. }
  1497. klog.V(2).Infof("StartQueryExecution result:")
  1498. klog.V(2).Infof(res.GoString())
  1499. var qri athena.GetQueryExecutionInput
  1500. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1501. var qrop *athena.GetQueryExecutionOutput
  1502. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1503. for {
  1504. qrop, err = svc.GetQueryExecution(&qri)
  1505. if err != nil {
  1506. return nil, err
  1507. }
  1508. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1509. break
  1510. }
  1511. time.Sleep(duration)
  1512. }
  1513. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1514. var ip athena.GetQueryResultsInput
  1515. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1516. return svc.GetQueryResults(&ip)
  1517. } else {
  1518. return nil, fmt.Errorf("No results available for %s", query)
  1519. }
  1520. }
  1521. type SavingsPlanData struct {
  1522. ResourceID string
  1523. EffectiveCost float64
  1524. SavingsPlanARN string
  1525. MostRecentDate string
  1526. }
  1527. func (a *AWS) GetSavingsPlanDataFromAthena() error {
  1528. cfg, err := a.GetConfig()
  1529. if err != nil {
  1530. return err
  1531. }
  1532. if cfg.AthenaBucketName == "" {
  1533. return fmt.Errorf("No Athena Bucket configured")
  1534. }
  1535. if a.SavingsPlanDataByInstanceID == nil {
  1536. a.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData)
  1537. }
  1538. tNow := time.Now()
  1539. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1540. start := tOneDayAgo.Format("2006-01-02")
  1541. end := tNow.Format("2006-01-02")
  1542. // Use Savings Plan Effective Rate as an estimation for cost, assuming the 1h most recent period got a fully loaded savings plan.
  1543. //
  1544. q := `SELECT
  1545. line_item_usage_start_date,
  1546. savings_plan_savings_plan_a_r_n,
  1547. line_item_resource_id,
  1548. savings_plan_savings_plan_rate
  1549. FROM %s as cost_data
  1550. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1551. AND line_item_line_item_type = 'SavingsPlanCoveredUsage' ORDER BY
  1552. line_item_usage_start_date DESC`
  1553. page := 0
  1554. processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
  1555. a.SavingsPlanDataLock.Lock()
  1556. a.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData) // Clean out the old data and only report a savingsplan price if its in the most recent run.
  1557. mostRecentDate := ""
  1558. iter := op.ResultSet.Rows
  1559. if page == 0 && len(iter) > 0 {
  1560. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1561. }
  1562. page++
  1563. for _, r := range iter {
  1564. d := *r.Data[0].VarCharValue
  1565. if mostRecentDate == "" {
  1566. mostRecentDate = d
  1567. } else if mostRecentDate != d { // Get all most recent assignments
  1568. break
  1569. }
  1570. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1571. if err != nil {
  1572. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1573. }
  1574. r := &SavingsPlanData{
  1575. ResourceID: *r.Data[2].VarCharValue,
  1576. EffectiveCost: cost,
  1577. SavingsPlanARN: *r.Data[1].VarCharValue,
  1578. MostRecentDate: d,
  1579. }
  1580. a.SavingsPlanDataByInstanceID[r.ResourceID] = r
  1581. }
  1582. klog.V(1).Infof("Found %d savings plan applied instances", len(a.SavingsPlanDataByInstanceID))
  1583. for k, r := range a.SavingsPlanDataByInstanceID {
  1584. log.DedupedInfof(5, "Savings Plan Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1585. }
  1586. a.SavingsPlanDataLock.Unlock()
  1587. return true
  1588. }
  1589. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1590. klog.V(3).Infof("Running Query: %s", query)
  1591. ip, svc, err := a.QueryAthenaPaginated(query)
  1592. if err != nil {
  1593. return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
  1594. }
  1595. athenaErr := svc.GetQueryResultsPages(ip, processResults)
  1596. if athenaErr != nil {
  1597. return athenaErr
  1598. }
  1599. return nil
  1600. }
  1601. type RIData struct {
  1602. ResourceID string
  1603. EffectiveCost float64
  1604. ReservationARN string
  1605. MostRecentDate string
  1606. }
  1607. func (a *AWS) GetReservationDataFromAthena() error {
  1608. cfg, err := a.GetConfig()
  1609. if err != nil {
  1610. return err
  1611. }
  1612. if cfg.AthenaBucketName == "" {
  1613. return fmt.Errorf("No Athena Bucket configured")
  1614. }
  1615. // Query for all column names in advance in order to validate configured
  1616. // label columns
  1617. columns, _ := a.ShowAthenaColumns()
  1618. if columns["reservation_reservation_a_r_n"] && columns["reservation_effective_cost"] {
  1619. if a.RIPricingByInstanceID == nil {
  1620. a.RIPricingByInstanceID = make(map[string]*RIData)
  1621. }
  1622. tNow := time.Now()
  1623. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1624. start := tOneDayAgo.Format("2006-01-02")
  1625. end := tNow.Format("2006-01-02")
  1626. q := `SELECT
  1627. line_item_usage_start_date,
  1628. reservation_reservation_a_r_n,
  1629. line_item_resource_id,
  1630. reservation_effective_cost
  1631. FROM %s as cost_data
  1632. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1633. AND reservation_reservation_a_r_n <> '' ORDER BY
  1634. line_item_usage_start_date DESC`
  1635. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1636. op, err := a.QueryAthenaBillingData(query)
  1637. if err != nil {
  1638. a.RIPricingError = err
  1639. return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
  1640. }
  1641. a.RIPricingError = nil
  1642. klog.Infof("Fetching RI data...")
  1643. if len(op.ResultSet.Rows) > 1 {
  1644. a.RIDataLock.Lock()
  1645. mostRecentDate := ""
  1646. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1647. d := *r.Data[0].VarCharValue
  1648. if mostRecentDate == "" {
  1649. mostRecentDate = d
  1650. } else if mostRecentDate != d { // Get all most recent assignments
  1651. break
  1652. }
  1653. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1654. if err != nil {
  1655. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1656. }
  1657. r := &RIData{
  1658. ResourceID: *r.Data[2].VarCharValue,
  1659. EffectiveCost: cost,
  1660. ReservationARN: *r.Data[1].VarCharValue,
  1661. MostRecentDate: d,
  1662. }
  1663. a.RIPricingByInstanceID[r.ResourceID] = r
  1664. }
  1665. klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
  1666. for k, r := range a.RIPricingByInstanceID {
  1667. log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1668. }
  1669. a.RIDataLock.Unlock()
  1670. } else {
  1671. klog.Infof("No reserved instance data found")
  1672. }
  1673. } else {
  1674. klog.Infof("No reserved data available in Athena")
  1675. a.RIPricingError = nil
  1676. }
  1677. return nil
  1678. }
  1679. // ShowAthenaColumns returns a list of the names of all columns in the configured
  1680. // Athena tables
  1681. func (aws *AWS) ShowAthenaColumns() (map[string]bool, error) {
  1682. columnSet := map[string]bool{}
  1683. // Configure Athena query
  1684. cfg, err := aws.GetConfig()
  1685. if err != nil {
  1686. return nil, err
  1687. }
  1688. if cfg.AthenaTable == "" {
  1689. return nil, fmt.Errorf("AthenaTable not configured")
  1690. }
  1691. if cfg.AthenaBucketName == "" {
  1692. return nil, fmt.Errorf("AthenaBucketName not configured")
  1693. }
  1694. q := `SHOW COLUMNS IN %s`
  1695. query := fmt.Sprintf(q, cfg.AthenaTable)
  1696. results, svc, err := aws.QueryAthenaPaginated(query)
  1697. columns := []string{}
  1698. pageNum := 0
  1699. athenaErr := svc.GetQueryResultsPages(results, func(page *athena.GetQueryResultsOutput, lastpage bool) bool {
  1700. for _, row := range page.ResultSet.Rows {
  1701. columns = append(columns, *row.Data[0].VarCharValue)
  1702. }
  1703. pageNum++
  1704. return true
  1705. })
  1706. if athenaErr != nil {
  1707. log.Warningf("Error getting Athena columns: %s", err)
  1708. return columnSet, athenaErr
  1709. }
  1710. for _, col := range columns {
  1711. columnSet[col] = true
  1712. }
  1713. return columnSet, nil
  1714. }
  1715. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  1716. // "start" and "end" are dates of the format YYYY-MM-DD
  1717. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  1718. func (a *AWS) ExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string, crossCluster bool) ([]*OutOfClusterAllocation, error) {
  1719. customPricing, err := a.GetConfig()
  1720. if err != nil {
  1721. return nil, err
  1722. }
  1723. formattedAggregators := []string{}
  1724. for _, agg := range aggregators {
  1725. aggregator_column_name := "resource_tags_user_" + agg
  1726. aggregator_column_name = cloudutil.ConvertToGlueColumnFormat(aggregator_column_name)
  1727. formattedAggregators = append(formattedAggregators, aggregator_column_name)
  1728. }
  1729. aggregatorNames := strings.Join(formattedAggregators, ",")
  1730. aggregatorOr := strings.Join(formattedAggregators, " <> '' OR ")
  1731. aggregatorOr = aggregatorOr + " <> ''"
  1732. filter_column_name := "resource_tags_user_" + filterType
  1733. filter_column_name = cloudutil.ConvertToGlueColumnFormat(filter_column_name)
  1734. var query string
  1735. var lastIdx int
  1736. if filterType != "kubernetes_" { // This gets appended upstream and is equivalent to no filter.
  1737. lastIdx = len(formattedAggregators) + 3
  1738. groupby := generateAWSGroupBy(lastIdx)
  1739. query = fmt.Sprintf(`SELECT
  1740. CAST(line_item_usage_start_date AS DATE) as start_date,
  1741. %s,
  1742. line_item_product_code,
  1743. %s,
  1744. SUM(line_item_blended_cost) as blended_cost
  1745. FROM %s as cost_data
  1746. WHERE (%s='%s') AND line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
  1747. GROUP BY %s`, aggregatorNames, filter_column_name, customPricing.AthenaTable, filter_column_name, filterValue, start, end, aggregatorOr, groupby)
  1748. } else {
  1749. lastIdx = len(formattedAggregators) + 2
  1750. groupby := generateAWSGroupBy(lastIdx)
  1751. query = fmt.Sprintf(`SELECT
  1752. CAST(line_item_usage_start_date AS DATE) as start_date,
  1753. %s,
  1754. line_item_product_code,
  1755. SUM(line_item_blended_cost) as blended_cost
  1756. FROM %s as cost_data
  1757. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
  1758. GROUP BY %s`, aggregatorNames, customPricing.AthenaTable, start, end, aggregatorOr, groupby)
  1759. }
  1760. var oocAllocs []*OutOfClusterAllocation
  1761. page := 0
  1762. processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
  1763. iter := op.ResultSet.Rows
  1764. if page == 0 && len(iter) > 0 {
  1765. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1766. }
  1767. page++
  1768. for _, r := range iter {
  1769. cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
  1770. if err != nil {
  1771. klog.Infof("Error converting cost `%s` from float ", *r.Data[lastIdx].VarCharValue)
  1772. }
  1773. environment := ""
  1774. for _, d := range r.Data[1 : len(formattedAggregators)+1] {
  1775. if *d.VarCharValue != "" {
  1776. environment = *d.VarCharValue // just set to the first nonempty match
  1777. }
  1778. break
  1779. }
  1780. ooc := &OutOfClusterAllocation{
  1781. Aggregator: strings.Join(aggregators, ","),
  1782. Environment: environment,
  1783. Service: *r.Data[len(formattedAggregators)+1].VarCharValue,
  1784. Cost: cost,
  1785. }
  1786. oocAllocs = append(oocAllocs, ooc)
  1787. }
  1788. return true
  1789. }
  1790. // Query for all column names in advance in order to validate configured
  1791. // label columns
  1792. columns, _ := a.ShowAthenaColumns()
  1793. // Check for all aggregators being formatted into the query
  1794. containsColumns := true
  1795. for _, agg := range formattedAggregators {
  1796. if columns[agg] != true {
  1797. containsColumns = false
  1798. klog.Warningf("Athena missing column: %s", agg)
  1799. }
  1800. }
  1801. if containsColumns {
  1802. klog.V(3).Infof("Running Query: %s", query)
  1803. ip, svc, _ := a.QueryAthenaPaginated(query)
  1804. athenaErr := svc.GetQueryResultsPages(ip, processResults)
  1805. if athenaErr != nil {
  1806. klog.Infof("RETURNING ATHENA ERROR")
  1807. return nil, athenaErr
  1808. }
  1809. if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
  1810. gcp, err := NewCrossClusterProvider("gcp", "aws.json", a.Clientset)
  1811. if err != nil {
  1812. klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
  1813. }
  1814. gcpOOC, err := gcp.ExternalAllocations(start, end, aggregators, filterType, filterValue, true)
  1815. if err != nil {
  1816. klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
  1817. }
  1818. oocAllocs = append(oocAllocs, gcpOOC...)
  1819. }
  1820. } else {
  1821. klog.Infof("External Allocations: Athena Query skipped due to missing columns")
  1822. }
  1823. return oocAllocs, nil
  1824. }
  1825. // QuerySQL can query a properly configured Athena database.
  1826. // Used to fetch billing data.
  1827. // Requires a json config in /var/configs with key region, output, and database.
  1828. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  1829. customPricing, err := a.GetConfig()
  1830. if err != nil {
  1831. return nil, err
  1832. }
  1833. a.ConfigureAuthWith(customPricing) // load aws authentication from configuration or secret
  1834. athenaConfigs, err := os.Open("/var/configs/athena.json")
  1835. if err != nil {
  1836. return nil, err
  1837. }
  1838. defer athenaConfigs.Close()
  1839. b, err := ioutil.ReadAll(athenaConfigs)
  1840. if err != nil {
  1841. return nil, err
  1842. }
  1843. var athenaConf map[string]string
  1844. json.Unmarshal([]byte(b), &athenaConf)
  1845. region := aws.String(customPricing.AthenaRegion)
  1846. resultsBucket := customPricing.AthenaBucketName
  1847. database := customPricing.AthenaDatabase
  1848. c := &aws.Config{
  1849. Region: region,
  1850. }
  1851. s := session.Must(session.NewSession(c))
  1852. svc := athena.New(s)
  1853. var e athena.StartQueryExecutionInput
  1854. var r athena.ResultConfiguration
  1855. r.SetOutputLocation(resultsBucket)
  1856. e.SetResultConfiguration(&r)
  1857. e.SetQueryString(query)
  1858. var q athena.QueryExecutionContext
  1859. q.SetDatabase(database)
  1860. e.SetQueryExecutionContext(&q)
  1861. res, err := svc.StartQueryExecution(&e)
  1862. if err != nil {
  1863. return nil, err
  1864. }
  1865. klog.V(2).Infof("StartQueryExecution result:")
  1866. klog.V(2).Infof(res.GoString())
  1867. var qri athena.GetQueryExecutionInput
  1868. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1869. var qrop *athena.GetQueryExecutionOutput
  1870. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1871. for {
  1872. qrop, err = svc.GetQueryExecution(&qri)
  1873. if err != nil {
  1874. return nil, err
  1875. }
  1876. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1877. break
  1878. }
  1879. time.Sleep(duration)
  1880. }
  1881. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1882. var ip athena.GetQueryResultsInput
  1883. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1884. op, err := svc.GetQueryResults(&ip)
  1885. if err != nil {
  1886. return nil, err
  1887. }
  1888. b, err := json.Marshal(op.ResultSet)
  1889. if err != nil {
  1890. return nil, err
  1891. }
  1892. return b, nil
  1893. }
  1894. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  1895. }
  1896. type spotInfo struct {
  1897. Timestamp string `csv:"Timestamp"`
  1898. UsageType string `csv:"UsageType"`
  1899. Operation string `csv:"Operation"`
  1900. InstanceID string `csv:"InstanceID"`
  1901. MyBidID string `csv:"MyBidID"`
  1902. MyMaxPrice string `csv:"MyMaxPrice"`
  1903. MarketPrice string `csv:"MarketPrice"`
  1904. Charge string `csv:"Charge"`
  1905. Version string `csv:"Version"`
  1906. }
  1907. type fnames []*string
  1908. func (f fnames) Len() int {
  1909. return len(f)
  1910. }
  1911. func (f fnames) Swap(i, j int) {
  1912. f[i], f[j] = f[j], f[i]
  1913. }
  1914. func (f fnames) Less(i, j int) bool {
  1915. key1 := strings.Split(*f[i], ".")
  1916. key2 := strings.Split(*f[j], ".")
  1917. t1, err := time.Parse("2006-01-02-15", key1[1])
  1918. if err != nil {
  1919. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  1920. return false
  1921. }
  1922. t2, err := time.Parse("2006-01-02-15", key2[1])
  1923. if err != nil {
  1924. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  1925. return false
  1926. }
  1927. return t1.Before(t2)
  1928. }
  1929. func (a *AWS) parseSpotData(bucket string, prefix string, projectID string, region string) (map[string]*spotInfo, error) {
  1930. if a.ServiceAccountChecks == nil { // Set up checks to store error/success states
  1931. a.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  1932. }
  1933. a.ConfigureAuth() // configure aws api authentication by setting env vars
  1934. s3Prefix := projectID
  1935. if len(prefix) != 0 {
  1936. s3Prefix = prefix + "/" + s3Prefix
  1937. }
  1938. c := aws.NewConfig().WithRegion(region)
  1939. s := session.Must(session.NewSession(c))
  1940. s3Svc := s3.New(s)
  1941. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  1942. tNow := time.Now()
  1943. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1944. ls := &s3.ListObjectsInput{
  1945. Bucket: aws.String(bucket),
  1946. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1947. }
  1948. ls2 := &s3.ListObjectsInput{
  1949. Bucket: aws.String(bucket),
  1950. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1951. }
  1952. lso, err := s3Svc.ListObjects(ls)
  1953. if err != nil {
  1954. a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
  1955. Message: "Bucket List Permissions Available",
  1956. Status: false,
  1957. AdditionalInfo: err.Error(),
  1958. }
  1959. return nil, err
  1960. } else {
  1961. a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
  1962. Message: "Bucket List Permissions Available",
  1963. Status: true,
  1964. }
  1965. }
  1966. lsoLen := len(lso.Contents)
  1967. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1968. if lsoLen == 0 {
  1969. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1970. }
  1971. lso2, err := s3Svc.ListObjects(ls2)
  1972. if err != nil {
  1973. return nil, err
  1974. }
  1975. lso2Len := len(lso2.Contents)
  1976. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1977. if lso2Len == 0 {
  1978. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1979. }
  1980. // TODO: Worth it to use LastModifiedDate to determine if we should reparse the spot data?
  1981. var keys []*string
  1982. for _, obj := range lso.Contents {
  1983. keys = append(keys, obj.Key)
  1984. }
  1985. for _, obj := range lso2.Contents {
  1986. keys = append(keys, obj.Key)
  1987. }
  1988. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1989. header, err := csvutil.Header(spotInfo{}, "csv")
  1990. if err != nil {
  1991. return nil, err
  1992. }
  1993. fieldsPerRecord := len(header)
  1994. spots := make(map[string]*spotInfo)
  1995. for _, key := range keys {
  1996. getObj := &s3.GetObjectInput{
  1997. Bucket: aws.String(bucket),
  1998. Key: key,
  1999. }
  2000. buf := aws.NewWriteAtBuffer([]byte{})
  2001. _, err := downloader.Download(buf, getObj)
  2002. if err != nil {
  2003. a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
  2004. Message: "Object Get Permissions Available",
  2005. Status: false,
  2006. AdditionalInfo: err.Error(),
  2007. }
  2008. return nil, err
  2009. } else {
  2010. a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
  2011. Message: "Object Get Permissions Available",
  2012. Status: true,
  2013. }
  2014. }
  2015. r := bytes.NewReader(buf.Bytes())
  2016. gr, err := gzip.NewReader(r)
  2017. if err != nil {
  2018. return nil, err
  2019. }
  2020. csvReader := csv.NewReader(gr)
  2021. csvReader.Comma = '\t'
  2022. csvReader.FieldsPerRecord = fieldsPerRecord
  2023. dec, err := csvutil.NewDecoder(csvReader, header...)
  2024. if err != nil {
  2025. return nil, err
  2026. }
  2027. var foundVersion string
  2028. for {
  2029. spot := spotInfo{}
  2030. err := dec.Decode(&spot)
  2031. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  2032. if err == io.EOF {
  2033. break
  2034. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  2035. rec := dec.Record()
  2036. // the first two "Record()" will be the comment lines
  2037. // and they show up as len() == 1
  2038. // the first of which is "#Version"
  2039. // the second of which is "#Fields: "
  2040. if len(rec) != 1 {
  2041. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  2042. continue
  2043. }
  2044. if len(foundVersion) == 0 {
  2045. spotFeedVersion := rec[0]
  2046. klog.V(4).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  2047. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  2048. if matches != nil {
  2049. foundVersion = matches[1]
  2050. if foundVersion != supportedSpotFeedVersion {
  2051. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  2052. break
  2053. }
  2054. }
  2055. continue
  2056. } else if strings.Index(rec[0], "#") == 0 {
  2057. continue
  2058. } else {
  2059. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  2060. continue
  2061. }
  2062. } else if err != nil {
  2063. klog.V(2).Infof("Error during spot info decode: %+v", err)
  2064. continue
  2065. }
  2066. log.DedupedInfof(5, "Found spot info for: %s", spot.InstanceID)
  2067. spots[spot.InstanceID] = &spot
  2068. }
  2069. gr.Close()
  2070. }
  2071. return spots, nil
  2072. }
  2073. func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  2074. }
  2075. func (a *AWS) ServiceAccountStatus() *ServiceAccountStatus {
  2076. checks := []*ServiceAccountCheck{}
  2077. for _, v := range a.ServiceAccountChecks {
  2078. checks = append(checks, v)
  2079. }
  2080. return &ServiceAccountStatus{
  2081. Checks: checks,
  2082. }
  2083. }
  2084. func (aws *AWS) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  2085. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  2086. }
  2087. func (aws *AWS) Regions() []string {
  2088. return awsRegions
  2089. }