awsprovider.go 67 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "net/http"
  10. "os"
  11. "regexp"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/aws/aws-sdk-go/aws/endpoints"
  17. "k8s.io/klog"
  18. "github.com/kubecost/cost-model/pkg/clustercache"
  19. "github.com/kubecost/cost-model/pkg/env"
  20. "github.com/kubecost/cost-model/pkg/errors"
  21. "github.com/kubecost/cost-model/pkg/log"
  22. "github.com/kubecost/cost-model/pkg/util"
  23. "github.com/kubecost/cost-model/pkg/util/cloudutil"
  24. "github.com/kubecost/cost-model/pkg/util/json"
  25. "github.com/aws/aws-sdk-go/aws"
  26. "github.com/aws/aws-sdk-go/aws/awserr"
  27. "github.com/aws/aws-sdk-go/aws/credentials"
  28. "github.com/aws/aws-sdk-go/aws/credentials/stscreds"
  29. "github.com/aws/aws-sdk-go/aws/session"
  30. "github.com/aws/aws-sdk-go/service/athena"
  31. "github.com/aws/aws-sdk-go/service/ec2"
  32. "github.com/aws/aws-sdk-go/service/s3"
  33. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  34. "github.com/jszwec/csvutil"
  35. v1 "k8s.io/api/core/v1"
  36. )
  37. const supportedSpotFeedVersion = "1"
  38. const SpotInfoUpdateType = "spotinfo"
  39. const AthenaInfoUpdateType = "athenainfo"
  40. const PreemptibleType = "preemptible"
  41. const APIPricingSource = "Public API"
  42. const SpotPricingSource = "Spot Data Feed"
  43. const ReservedInstancePricingSource = "Savings Plan, Reserved Instance, and Out-Of-Cluster"
  44. func (aws *AWS) PricingSourceStatus() map[string]*PricingSource {
  45. sources := make(map[string]*PricingSource)
  46. sps := &PricingSource{
  47. Name: SpotPricingSource,
  48. }
  49. sps.Error = aws.SpotPricingStatus
  50. if sps.Error != "" {
  51. sps.Available = false
  52. } else if len(aws.SpotPricingByInstanceID) > 0 {
  53. sps.Available = true
  54. } else {
  55. sps.Error = "No spot instances detected"
  56. }
  57. sources[SpotPricingSource] = sps
  58. rps := &PricingSource{
  59. Name: ReservedInstancePricingSource,
  60. }
  61. rps.Error = aws.RIPricingStatus
  62. if rps.Error != "" {
  63. rps.Available = false
  64. } else {
  65. rps.Available = true
  66. }
  67. sources[ReservedInstancePricingSource] = rps
  68. return sources
  69. }
  70. // How often spot data is refreshed
  71. const SpotRefreshDuration = 15 * time.Minute
  72. const defaultConfigPath = "/var/configs/"
  73. var awsRegions = []string{
  74. "us-east-2",
  75. "us-east-1",
  76. "us-west-1",
  77. "us-west-2",
  78. "ap-east-1",
  79. "ap-south-1",
  80. "ap-northeast-3",
  81. "ap-northeast-2",
  82. "ap-southeast-1",
  83. "ap-southeast-2",
  84. "ap-northeast-1",
  85. "ca-central-1",
  86. "cn-north-1",
  87. "cn-northwest-1",
  88. "eu-central-1",
  89. "eu-west-1",
  90. "eu-west-2",
  91. "eu-west-3",
  92. "eu-north-1",
  93. "me-south-1",
  94. "sa-east-1",
  95. "us-gov-east-1",
  96. "us-gov-west-1",
  97. }
  98. // AWS represents an Amazon Provider
  99. type AWS struct {
  100. Pricing map[string]*AWSProductTerms
  101. SpotPricingByInstanceID map[string]*spotInfo
  102. SpotPricingUpdatedAt *time.Time
  103. SpotRefreshRunning bool
  104. SpotPricingLock sync.RWMutex
  105. SpotPricingStatus string
  106. RIPricingByInstanceID map[string]*RIData
  107. RIPricingStatus string
  108. RIDataRunning bool
  109. RIDataLock sync.RWMutex
  110. SavingsPlanDataByInstanceID map[string]*SavingsPlanData
  111. SavingsPlanDataRunning bool
  112. SavingsPlanDataLock sync.RWMutex
  113. ValidPricingKeys map[string]bool
  114. Clientset clustercache.ClusterCache
  115. BaseCPUPrice string
  116. BaseRAMPrice string
  117. BaseGPUPrice string
  118. BaseSpotCPUPrice string
  119. BaseSpotRAMPrice string
  120. BaseSpotGPUPrice string
  121. SpotLabelName string
  122. SpotLabelValue string
  123. SpotDataRegion string
  124. SpotDataBucket string
  125. SpotDataPrefix string
  126. ProjectID string
  127. DownloadPricingDataLock sync.RWMutex
  128. Config *ProviderConfig
  129. ServiceAccountChecks map[string]*ServiceAccountCheck
  130. clusterManagementPrice float64
  131. clusterProvisioner string
  132. *CustomProvider
  133. }
  134. type AWSAccessKey struct {
  135. AccessKeyID string `json:"aws_access_key_id"`
  136. SecretAccessKey string `json:"aws_secret_access_key"`
  137. }
  138. // AWSPricing maps a k8s node to an AWS Pricing "product"
  139. type AWSPricing struct {
  140. Products map[string]*AWSProduct `json:"products"`
  141. Terms AWSPricingTerms `json:"terms"`
  142. }
  143. // AWSProduct represents a purchased SKU
  144. type AWSProduct struct {
  145. Sku string `json:"sku"`
  146. Attributes AWSProductAttributes `json:"attributes"`
  147. }
  148. // AWSProductAttributes represents metadata about the product used to map to a node.
  149. type AWSProductAttributes struct {
  150. Location string `json:"location"`
  151. InstanceType string `json:"instanceType"`
  152. Memory string `json:"memory"`
  153. Storage string `json:"storage"`
  154. VCpu string `json:"vcpu"`
  155. UsageType string `json:"usagetype"`
  156. OperatingSystem string `json:"operatingSystem"`
  157. PreInstalledSw string `json:"preInstalledSw"`
  158. InstanceFamily string `json:"instanceFamily"`
  159. CapacityStatus string `json:"capacitystatus"`
  160. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  161. }
  162. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  163. type AWSPricingTerms struct {
  164. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  165. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  166. }
  167. // AWSOfferTerm is a sku extension used to pay for the node.
  168. type AWSOfferTerm struct {
  169. Sku string `json:"sku"`
  170. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  171. }
  172. func (ot *AWSOfferTerm) String() string {
  173. var strs []string
  174. for k, rc := range ot.PriceDimensions {
  175. strs = append(strs, fmt.Sprintf("%s:%s", k, rc.String()))
  176. }
  177. return fmt.Sprintf("%s:%s", ot.Sku, strings.Join(strs, ","))
  178. }
  179. // AWSRateCode encodes data about the price of a product
  180. type AWSRateCode struct {
  181. Unit string `json:"unit"`
  182. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  183. }
  184. func (rc *AWSRateCode) String() string {
  185. return fmt.Sprintf("{unit: %s, pricePerUnit: %v", rc.Unit, rc.PricePerUnit)
  186. }
  187. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  188. type AWSCurrencyCode struct {
  189. USD string `json:"USD,omitempty"`
  190. CNY string `json:"CNY,omitempty"`
  191. }
  192. // AWSProductTerms represents the full terms of the product
  193. type AWSProductTerms struct {
  194. Sku string `json:"sku"`
  195. OnDemand *AWSOfferTerm `json:"OnDemand"`
  196. Reserved *AWSOfferTerm `json:"Reserved"`
  197. Memory string `json:"memory"`
  198. Storage string `json:"storage"`
  199. VCpu string `json:"vcpu"`
  200. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  201. PV *PV `json:"pv"`
  202. }
  203. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  204. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  205. // OnDemandRateCode is appended to an node sku
  206. const OnDemandRateCode = ".JRTCKXETXF"
  207. const OnDemandRateCodeCn = ".99YE2YK9UR"
  208. // ReservedRateCode is appended to a node sku
  209. const ReservedRateCode = ".38NPMPTW36"
  210. // HourlyRateCode is appended to a node sku
  211. const HourlyRateCode = ".6YS6EN2CT7"
  212. const HourlyRateCodeCn = ".Q7UJUT2CE6"
  213. // volTypes are used to map between AWS UsageTypes and
  214. // EBS volume types, as they would appear in K8s storage class
  215. // name and the EC2 API.
  216. var volTypes = map[string]string{
  217. "EBS:VolumeUsage.gp2": "gp2",
  218. "EBS:VolumeUsage": "standard",
  219. "EBS:VolumeUsage.sc1": "sc1",
  220. "EBS:VolumeP-IOPS.piops": "io1",
  221. "EBS:VolumeUsage.st1": "st1",
  222. "EBS:VolumeUsage.piops": "io1",
  223. "gp2": "EBS:VolumeUsage.gp2",
  224. "standard": "EBS:VolumeUsage",
  225. "sc1": "EBS:VolumeUsage.sc1",
  226. "io1": "EBS:VolumeUsage.piops",
  227. "st1": "EBS:VolumeUsage.st1",
  228. }
  229. // locationToRegion maps AWS region names (As they come from Billing)
  230. // to actual region identifiers
  231. var locationToRegion = map[string]string{
  232. "US East (Ohio)": "us-east-2",
  233. "US East (N. Virginia)": "us-east-1",
  234. "US West (N. California)": "us-west-1",
  235. "US West (Oregon)": "us-west-2",
  236. "Asia Pacific (Hong Kong)": "ap-east-1",
  237. "Asia Pacific (Mumbai)": "ap-south-1",
  238. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  239. "Asia Pacific (Seoul)": "ap-northeast-2",
  240. "Asia Pacific (Singapore)": "ap-southeast-1",
  241. "Asia Pacific (Sydney)": "ap-southeast-2",
  242. "Asia Pacific (Tokyo)": "ap-northeast-1",
  243. "Canada (Central)": "ca-central-1",
  244. "China (Beijing)": "cn-north-1",
  245. "China (Ningxia)": "cn-northwest-1",
  246. "EU (Frankfurt)": "eu-central-1",
  247. "EU (Ireland)": "eu-west-1",
  248. "EU (London)": "eu-west-2",
  249. "EU (Paris)": "eu-west-3",
  250. "EU (Stockholm)": "eu-north-1",
  251. "South America (Sao Paulo)": "sa-east-1",
  252. "AWS GovCloud (US-East)": "us-gov-east-1",
  253. "AWS GovCloud (US-West)": "us-gov-west-1",
  254. }
  255. var regionToBillingRegionCode = map[string]string{
  256. "us-east-2": "USE2",
  257. "us-east-1": "",
  258. "us-west-1": "USW1",
  259. "us-west-2": "USW2",
  260. "ap-east-1": "APE1",
  261. "ap-south-1": "APS3",
  262. "ap-northeast-3": "APN3",
  263. "ap-northeast-2": "APN2",
  264. "ap-southeast-1": "APS1",
  265. "ap-southeast-2": "APS2",
  266. "ap-northeast-1": "APN1",
  267. "ca-central-1": "CAN1",
  268. "cn-north-1": "",
  269. "cn-northwest-1": "",
  270. "eu-central-1": "EUC1",
  271. "eu-west-1": "EU",
  272. "eu-west-2": "EUW2",
  273. "eu-west-3": "EUW3",
  274. "eu-north-1": "EUN1",
  275. "sa-east-1": "SAE1",
  276. "us-gov-east-1": "UGE1",
  277. "us-gov-west-1": "UGW1",
  278. }
  279. var loadedAWSSecret bool = false
  280. var awsSecret *AWSAccessKey = nil
  281. func (aws *AWS) GetLocalStorageQuery(window, offset time.Duration, rate bool, used bool) string {
  282. return ""
  283. }
  284. // KubeAttrConversion maps the k8s labels for region to an aws region
  285. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  286. operatingSystem = strings.ToLower(operatingSystem)
  287. region := locationToRegion[location]
  288. return region + "," + instanceType + "," + operatingSystem
  289. }
  290. type AwsSpotFeedInfo struct {
  291. BucketName string `json:"bucketName"`
  292. Prefix string `json:"prefix"`
  293. Region string `json:"region"`
  294. AccountID string `json:"projectID"`
  295. ServiceKeyName string `json:"serviceKeyName"`
  296. ServiceKeySecret string `json:"serviceKeySecret"`
  297. SpotLabel string `json:"spotLabel"`
  298. SpotLabelValue string `json:"spotLabelValue"`
  299. }
  300. type AwsAthenaInfo struct {
  301. AthenaBucketName string `json:"athenaBucketName"`
  302. AthenaRegion string `json:"athenaRegion"`
  303. AthenaDatabase string `json:"athenaDatabase"`
  304. AthenaTable string `json:"athenaTable"`
  305. ServiceKeyName string `json:"serviceKeyName"`
  306. ServiceKeySecret string `json:"serviceKeySecret"`
  307. AccountID string `json:"projectID"`
  308. MasterPayerARN string `json:"masterPayerARN"`
  309. }
  310. func (aws *AWS) GetManagementPlatform() (string, error) {
  311. nodes := aws.Clientset.GetAllNodes()
  312. if len(nodes) > 0 {
  313. n := nodes[0]
  314. version := n.Status.NodeInfo.KubeletVersion
  315. if strings.Contains(version, "eks") {
  316. return "eks", nil
  317. }
  318. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  319. return "kops", nil
  320. }
  321. }
  322. return "", nil
  323. }
  324. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  325. c, err := aws.Config.GetCustomPricingData()
  326. if err != nil {
  327. return nil, err
  328. }
  329. if c.Discount == "" {
  330. c.Discount = "0%"
  331. }
  332. if c.NegotiatedDiscount == "" {
  333. c.NegotiatedDiscount = "0%"
  334. }
  335. if c.ShareTenancyCosts == "" {
  336. c.ShareTenancyCosts = defaultShareTenancyCost
  337. }
  338. return c, nil
  339. }
  340. func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
  341. return aws.Config.UpdateFromMap(a)
  342. }
  343. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  344. return aws.Config.Update(func(c *CustomPricing) error {
  345. if updateType == SpotInfoUpdateType {
  346. a := AwsSpotFeedInfo{}
  347. err := json.NewDecoder(r).Decode(&a)
  348. if err != nil {
  349. return err
  350. }
  351. c.ServiceKeyName = a.ServiceKeyName
  352. if a.ServiceKeySecret != "" {
  353. c.ServiceKeySecret = a.ServiceKeySecret
  354. }
  355. c.SpotDataPrefix = a.Prefix
  356. c.SpotDataBucket = a.BucketName
  357. c.ProjectID = a.AccountID
  358. c.SpotDataRegion = a.Region
  359. c.SpotLabel = a.SpotLabel
  360. c.SpotLabelValue = a.SpotLabelValue
  361. } else if updateType == AthenaInfoUpdateType {
  362. a := AwsAthenaInfo{}
  363. err := json.NewDecoder(r).Decode(&a)
  364. if err != nil {
  365. return err
  366. }
  367. c.AthenaBucketName = a.AthenaBucketName
  368. c.AthenaRegion = a.AthenaRegion
  369. c.AthenaDatabase = a.AthenaDatabase
  370. c.AthenaTable = a.AthenaTable
  371. c.ServiceKeyName = a.ServiceKeyName
  372. if a.ServiceKeySecret != "" {
  373. c.ServiceKeySecret = a.ServiceKeySecret
  374. }
  375. if a.MasterPayerARN != "" {
  376. c.MasterPayerARN = a.MasterPayerARN
  377. }
  378. c.AthenaProjectID = a.AccountID
  379. } else {
  380. a := make(map[string]interface{})
  381. err := json.NewDecoder(r).Decode(&a)
  382. if err != nil {
  383. return err
  384. }
  385. for k, v := range a {
  386. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  387. vstr, ok := v.(string)
  388. if ok {
  389. err := SetCustomPricingField(c, kUpper, vstr)
  390. if err != nil {
  391. return err
  392. }
  393. } else {
  394. return fmt.Errorf("type error while updating config for %s", kUpper)
  395. }
  396. }
  397. }
  398. if env.IsRemoteEnabled() {
  399. err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
  400. if err != nil {
  401. return err
  402. }
  403. }
  404. return nil
  405. })
  406. }
  407. type awsKey struct {
  408. SpotLabelName string
  409. SpotLabelValue string
  410. Labels map[string]string
  411. ProviderID string
  412. }
  413. func (k *awsKey) GPUType() string {
  414. return ""
  415. }
  416. func (k *awsKey) ID() string {
  417. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  418. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  419. if matchNum == 2 {
  420. return group
  421. }
  422. }
  423. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  424. return ""
  425. }
  426. func (k *awsKey) Features() string {
  427. instanceType, _ := util.GetInstanceType(k.Labels)
  428. operatingSystem, _ := util.GetOperatingSystem(k.Labels)
  429. region, _ := util.GetRegion(k.Labels)
  430. key := region + "," + instanceType + "," + operatingSystem
  431. usageType := PreemptibleType
  432. spotKey := key + "," + usageType
  433. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  434. return spotKey
  435. }
  436. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  437. return spotKey
  438. }
  439. return key
  440. }
  441. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  442. pricing, ok := aws.Pricing[pvk.Features()]
  443. if !ok {
  444. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  445. return &PV{}, nil
  446. }
  447. return pricing.PV, nil
  448. }
  449. type awsPVKey struct {
  450. Labels map[string]string
  451. StorageClassParameters map[string]string
  452. StorageClassName string
  453. Name string
  454. DefaultRegion string
  455. ProviderID string
  456. }
  457. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  458. providerID := ""
  459. if pv.Spec.AWSElasticBlockStore != nil {
  460. providerID = pv.Spec.AWSElasticBlockStore.VolumeID
  461. } else if pv.Spec.CSI != nil {
  462. providerID = pv.Spec.CSI.VolumeHandle
  463. }
  464. return &awsPVKey{
  465. Labels: pv.Labels,
  466. StorageClassName: pv.Spec.StorageClassName,
  467. StorageClassParameters: parameters,
  468. Name: pv.Name,
  469. DefaultRegion: defaultRegion,
  470. ProviderID: providerID,
  471. }
  472. }
  473. func (key *awsPVKey) ID() string {
  474. return key.ProviderID
  475. }
  476. func (key *awsPVKey) GetStorageClass() string {
  477. return key.StorageClassName
  478. }
  479. func (key *awsPVKey) Features() string {
  480. storageClass := key.StorageClassParameters["type"]
  481. if storageClass == "standard" {
  482. storageClass = "gp2"
  483. }
  484. // Storage class names are generally EBS volume types (gp2)
  485. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  486. // Converts between the 2
  487. region, _ := util.GetRegion(key.Labels)
  488. //if region == "" {
  489. // region = "us-east-1"
  490. //}
  491. class, ok := volTypes[storageClass]
  492. if !ok {
  493. klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  494. }
  495. return region + "," + class
  496. }
  497. // GetKey maps node labels to information needed to retrieve pricing data
  498. func (aws *AWS) GetKey(labels map[string]string, n *v1.Node) Key {
  499. return &awsKey{
  500. SpotLabelName: aws.SpotLabelName,
  501. SpotLabelValue: aws.SpotLabelValue,
  502. Labels: labels,
  503. ProviderID: labels["providerID"],
  504. }
  505. }
  506. func (aws *AWS) isPreemptible(key string) bool {
  507. s := strings.Split(key, ",")
  508. if len(s) == 4 && s[3] == PreemptibleType {
  509. return true
  510. }
  511. return false
  512. }
  513. func (aws *AWS) ClusterManagementPricing() (string, float64, error) {
  514. return aws.clusterProvisioner, aws.clusterManagementPrice, nil
  515. }
  516. // Use the pricing data from the current region. Fall back to using all region data if needed.
  517. func (aws *AWS) getRegionPricing(nodeList []*v1.Node) (*http.Response, string, error) {
  518. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/"
  519. region := ""
  520. multiregion := false
  521. for _, n := range nodeList {
  522. labels := n.GetLabels()
  523. currentNodeRegion := ""
  524. if r, ok := util.GetRegion(labels); ok {
  525. currentNodeRegion = r
  526. // Switch to Chinese endpoint for regions with the Chinese prefix
  527. if strings.HasPrefix(currentNodeRegion, "cn-") {
  528. pricingURL = "https://pricing.cn-north-1.amazonaws.com.cn/offers/v1.0/cn/AmazonEC2/current/"
  529. }
  530. } else {
  531. multiregion = true // We weren't able to detect the node's region, so pull all data.
  532. break
  533. }
  534. if region == "" { // We haven't set a region yet
  535. region = currentNodeRegion
  536. } else if region != "" && currentNodeRegion != region { // If two nodes have different regions here, we'll need to fetch all pricing data.
  537. multiregion = true
  538. break
  539. }
  540. }
  541. // Chinese multiregion endpoint only contains data for Chinese regions and Chinese regions are excluded from other endpoint
  542. if region != "" && !multiregion {
  543. pricingURL += region + "/"
  544. }
  545. pricingURL += "index.json"
  546. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  547. resp, err := http.Get(pricingURL)
  548. if err != nil {
  549. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  550. return nil, pricingURL, err
  551. }
  552. return resp, pricingURL, err
  553. }
  554. // DownloadPricingData fetches data from the AWS Pricing API
  555. func (aws *AWS) DownloadPricingData() error {
  556. aws.DownloadPricingDataLock.Lock()
  557. defer aws.DownloadPricingDataLock.Unlock()
  558. if aws.ServiceAccountChecks == nil {
  559. aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  560. }
  561. c, err := aws.Config.GetCustomPricingData()
  562. if err != nil {
  563. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  564. }
  565. aws.BaseCPUPrice = c.CPU
  566. aws.BaseRAMPrice = c.RAM
  567. aws.BaseGPUPrice = c.GPU
  568. aws.BaseSpotCPUPrice = c.SpotCPU
  569. aws.BaseSpotRAMPrice = c.SpotRAM
  570. aws.BaseSpotGPUPrice = c.SpotGPU
  571. aws.SpotLabelName = c.SpotLabel
  572. aws.SpotLabelValue = c.SpotLabelValue
  573. aws.SpotDataBucket = c.SpotDataBucket
  574. aws.SpotDataPrefix = c.SpotDataPrefix
  575. aws.ProjectID = c.ProjectID
  576. aws.SpotDataRegion = c.SpotDataRegion
  577. aws.ConfigureAuthWith(c) // load aws authentication from configuration or secret
  578. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  579. klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  580. }
  581. nodeList := aws.Clientset.GetAllNodes()
  582. inputkeys := make(map[string]bool)
  583. for _, n := range nodeList {
  584. if _, ok := n.Labels["eks.amazonaws.com/nodegroup"]; ok {
  585. aws.clusterManagementPrice = 0.10
  586. aws.clusterProvisioner = "EKS"
  587. } else if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  588. aws.clusterProvisioner = "KOPS"
  589. }
  590. labels := n.GetObjectMeta().GetLabels()
  591. key := aws.GetKey(labels, n)
  592. inputkeys[key.Features()] = true
  593. }
  594. pvList := aws.Clientset.GetAllPersistentVolumes()
  595. storageClasses := aws.Clientset.GetAllStorageClasses()
  596. storageClassMap := make(map[string]map[string]string)
  597. for _, storageClass := range storageClasses {
  598. params := storageClass.Parameters
  599. storageClassMap[storageClass.ObjectMeta.Name] = params
  600. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  601. storageClassMap["default"] = params
  602. storageClassMap[""] = params
  603. }
  604. }
  605. pvkeys := make(map[string]PVKey)
  606. for _, pv := range pvList {
  607. params, ok := storageClassMap[pv.Spec.StorageClassName]
  608. if !ok {
  609. klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  610. continue
  611. }
  612. key := aws.GetPVKey(pv, params, "")
  613. pvkeys[key.Features()] = key
  614. }
  615. // RIDataRunning establishes the existance of the goroutine. Since it's possible we
  616. // run multiple downloads, we don't want to create multiple go routines if one already exists
  617. if !aws.RIDataRunning && c.AthenaBucketName != "" {
  618. err = aws.GetReservationDataFromAthena() // Block until one run has completed.
  619. if err != nil {
  620. klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
  621. } else { // If we make one successful run, check on new reservation data every hour
  622. go func() {
  623. defer errors.HandlePanic()
  624. aws.RIDataRunning = true
  625. for {
  626. klog.Infof("Reserved Instance watcher running... next update in 1h")
  627. time.Sleep(time.Hour)
  628. err := aws.GetReservationDataFromAthena()
  629. if err != nil {
  630. klog.Infof("Error updating RI data: %s", err.Error())
  631. }
  632. }
  633. }()
  634. }
  635. }
  636. if !aws.SavingsPlanDataRunning && c.AthenaBucketName != "" {
  637. err = aws.GetSavingsPlanDataFromAthena()
  638. if err != nil {
  639. klog.V(1).Infof("Failed to lookup savings plan data: %s", err.Error())
  640. } else {
  641. go func() {
  642. defer errors.HandlePanic()
  643. aws.SavingsPlanDataRunning = true
  644. for {
  645. klog.Infof("Savings Plan watcher running... next update in 1h")
  646. time.Sleep(time.Hour)
  647. err := aws.GetSavingsPlanDataFromAthena()
  648. if err != nil {
  649. klog.Infof("Error updating Savings Plan data: %s", err.Error())
  650. }
  651. }
  652. }()
  653. }
  654. }
  655. aws.Pricing = make(map[string]*AWSProductTerms)
  656. aws.ValidPricingKeys = make(map[string]bool)
  657. skusToKeys := make(map[string]string)
  658. resp, pricingURL, err := aws.getRegionPricing(nodeList)
  659. if err != nil {
  660. return err
  661. }
  662. dec := json.NewDecoder(resp.Body)
  663. for {
  664. t, err := dec.Token()
  665. if err == io.EOF {
  666. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  667. break
  668. } else if err != nil {
  669. klog.V(2).Infof("error parsing response json %v", resp.Body)
  670. break
  671. }
  672. if t == "products" {
  673. _, err := dec.Token() // this should parse the opening "{""
  674. if err != nil {
  675. return err
  676. }
  677. for dec.More() {
  678. _, err := dec.Token() // the sku token
  679. if err != nil {
  680. return err
  681. }
  682. product := &AWSProduct{}
  683. err = dec.Decode(&product)
  684. if err != nil {
  685. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  686. break
  687. }
  688. if product.Attributes.PreInstalledSw == "NA" &&
  689. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) &&
  690. product.Attributes.CapacityStatus == "Used" {
  691. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  692. spotKey := key + ",preemptible"
  693. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  694. productTerms := &AWSProductTerms{
  695. Sku: product.Sku,
  696. Memory: product.Attributes.Memory,
  697. Storage: product.Attributes.Storage,
  698. VCpu: product.Attributes.VCpu,
  699. GPU: product.Attributes.GPU,
  700. }
  701. aws.Pricing[key] = productTerms
  702. aws.Pricing[spotKey] = productTerms
  703. skusToKeys[product.Sku] = key
  704. }
  705. aws.ValidPricingKeys[key] = true
  706. aws.ValidPricingKeys[spotKey] = true
  707. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  708. // UsageTypes may be prefixed with a region code - we're removing this when using
  709. // volTypes to keep lookups generic
  710. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  711. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  712. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  713. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  714. spotKey := key + ",preemptible"
  715. pv := &PV{
  716. Class: volTypes[usageTypeNoRegion],
  717. Region: locationToRegion[product.Attributes.Location],
  718. }
  719. productTerms := &AWSProductTerms{
  720. Sku: product.Sku,
  721. PV: pv,
  722. }
  723. aws.Pricing[key] = productTerms
  724. aws.Pricing[spotKey] = productTerms
  725. skusToKeys[product.Sku] = key
  726. aws.ValidPricingKeys[key] = true
  727. aws.ValidPricingKeys[spotKey] = true
  728. }
  729. }
  730. }
  731. if t == "terms" {
  732. _, err := dec.Token() // this should parse the opening "{""
  733. if err != nil {
  734. return err
  735. }
  736. termType, err := dec.Token()
  737. if err != nil {
  738. return err
  739. }
  740. if termType == "OnDemand" {
  741. _, err := dec.Token()
  742. if err != nil { // again, should parse an opening "{"
  743. return err
  744. }
  745. for dec.More() {
  746. sku, err := dec.Token()
  747. if err != nil {
  748. return err
  749. }
  750. _, err = dec.Token() // another opening "{"
  751. if err != nil {
  752. return err
  753. }
  754. skuOnDemand, err := dec.Token()
  755. if err != nil {
  756. return err
  757. }
  758. offerTerm := &AWSOfferTerm{}
  759. err = dec.Decode(&offerTerm)
  760. if err != nil {
  761. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  762. }
  763. key, ok := skusToKeys[sku.(string)]
  764. spotKey := key + ",preemptible"
  765. if ok {
  766. aws.Pricing[key].OnDemand = offerTerm
  767. aws.Pricing[spotKey].OnDemand = offerTerm
  768. var cost string
  769. if sku.(string)+OnDemandRateCode == skuOnDemand {
  770. cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  771. } else if sku.(string)+OnDemandRateCodeCn == skuOnDemand {
  772. cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCodeCn+HourlyRateCodeCn].PricePerUnit.CNY
  773. }
  774. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  775. // If the specific UsageType is the per IO cost used on io1 volumes
  776. // we need to add the per IO cost to the io1 PV cost
  777. // Add the per IO cost to the PV object for the io1 volume type
  778. aws.Pricing[key].PV.CostPerIO = cost
  779. } else if strings.Contains(key, "EBS:Volume") {
  780. // If volume, we need to get hourly cost and add it to the PV object
  781. costFloat, _ := strconv.ParseFloat(cost, 64)
  782. hourlyPrice := costFloat / 730
  783. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  784. }
  785. }
  786. _, err = dec.Token()
  787. if err != nil {
  788. return err
  789. }
  790. }
  791. _, err = dec.Token()
  792. if err != nil {
  793. return err
  794. }
  795. }
  796. }
  797. }
  798. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  799. // Always run spot pricing refresh when performing download
  800. aws.refreshSpotPricing(true)
  801. // Only start a single refresh goroutine
  802. if !aws.SpotRefreshRunning {
  803. aws.SpotRefreshRunning = true
  804. go func() {
  805. defer errors.HandlePanic()
  806. for {
  807. klog.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
  808. time.Sleep(SpotRefreshDuration)
  809. // Reoccurring refresh checks update times
  810. aws.refreshSpotPricing(false)
  811. }
  812. }()
  813. }
  814. return nil
  815. }
  816. func (aws *AWS) refreshSpotPricing(force bool) {
  817. aws.SpotPricingLock.Lock()
  818. defer aws.SpotPricingLock.Unlock()
  819. now := time.Now().UTC()
  820. updateTime := now.Add(-SpotRefreshDuration)
  821. // Return if there was an update time set and an hour hasn't elapsed
  822. if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
  823. return
  824. }
  825. sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion)
  826. if err != nil {
  827. klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
  828. aws.SpotPricingStatus = err.Error()
  829. return
  830. }
  831. aws.SpotPricingStatus = ""
  832. // update time last updated
  833. aws.SpotPricingUpdatedAt = &now
  834. aws.SpotPricingByInstanceID = sp
  835. }
  836. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  837. func (aws *AWS) NetworkPricing() (*Network, error) {
  838. cpricing, err := aws.Config.GetCustomPricingData()
  839. if err != nil {
  840. return nil, err
  841. }
  842. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  843. if err != nil {
  844. return nil, err
  845. }
  846. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  847. if err != nil {
  848. return nil, err
  849. }
  850. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  851. if err != nil {
  852. return nil, err
  853. }
  854. return &Network{
  855. ZoneNetworkEgressCost: znec,
  856. RegionNetworkEgressCost: rnec,
  857. InternetNetworkEgressCost: inec,
  858. }, nil
  859. }
  860. func (aws *AWS) LoadBalancerPricing() (*LoadBalancer, error) {
  861. fffrc := 0.025
  862. afrc := 0.010
  863. lbidc := 0.008
  864. numForwardingRules := 1.0
  865. dataIngressGB := 0.0
  866. var totalCost float64
  867. if numForwardingRules < 5 {
  868. totalCost = fffrc*numForwardingRules + lbidc*dataIngressGB
  869. } else {
  870. totalCost = fffrc*5 + afrc*(numForwardingRules-5) + lbidc*dataIngressGB
  871. }
  872. return &LoadBalancer{
  873. Cost: totalCost,
  874. }, nil
  875. }
  876. // AllNodePricing returns all the billing data fetched.
  877. func (aws *AWS) AllNodePricing() (interface{}, error) {
  878. aws.DownloadPricingDataLock.RLock()
  879. defer aws.DownloadPricingDataLock.RUnlock()
  880. return aws.Pricing, nil
  881. }
  882. func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
  883. aws.SpotPricingLock.RLock()
  884. defer aws.SpotPricingLock.RUnlock()
  885. info, ok := aws.SpotPricingByInstanceID[instanceID]
  886. return info, ok
  887. }
  888. func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
  889. aws.RIDataLock.RLock()
  890. defer aws.RIDataLock.RUnlock()
  891. data, ok := aws.RIPricingByInstanceID[instanceID]
  892. return data, ok
  893. }
  894. func (aws *AWS) savingsPlanPricing(instanceID string) (*SavingsPlanData, bool) {
  895. aws.SavingsPlanDataLock.RLock()
  896. defer aws.SavingsPlanDataLock.RUnlock()
  897. data, ok := aws.SavingsPlanDataByInstanceID[instanceID]
  898. return data, ok
  899. }
  900. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  901. key := k.Features()
  902. if spotInfo, ok := aws.spotPricing(k.ID()); ok {
  903. var spotcost string
  904. log.DedupedInfof(5, "Looking up spot data from feed for node %s", k.ID())
  905. arr := strings.Split(spotInfo.Charge, " ")
  906. if len(arr) == 2 {
  907. spotcost = arr[0]
  908. } else {
  909. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  910. }
  911. return &Node{
  912. Cost: spotcost,
  913. VCPU: terms.VCpu,
  914. RAM: terms.Memory,
  915. GPU: terms.GPU,
  916. Storage: terms.Storage,
  917. BaseCPUPrice: aws.BaseCPUPrice,
  918. BaseRAMPrice: aws.BaseRAMPrice,
  919. BaseGPUPrice: aws.BaseGPUPrice,
  920. UsageType: PreemptibleType,
  921. }, nil
  922. } else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
  923. log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
  924. return &Node{
  925. VCPU: terms.VCpu,
  926. VCPUCost: aws.BaseSpotCPUPrice,
  927. RAM: terms.Memory,
  928. GPU: terms.GPU,
  929. Storage: terms.Storage,
  930. BaseCPUPrice: aws.BaseCPUPrice,
  931. BaseRAMPrice: aws.BaseRAMPrice,
  932. BaseGPUPrice: aws.BaseGPUPrice,
  933. UsageType: PreemptibleType,
  934. }, nil
  935. } else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
  936. strCost := fmt.Sprintf("%f", sp.EffectiveCost)
  937. return &Node{
  938. Cost: strCost,
  939. VCPU: terms.VCpu,
  940. RAM: terms.Memory,
  941. GPU: terms.GPU,
  942. Storage: terms.Storage,
  943. BaseCPUPrice: aws.BaseCPUPrice,
  944. BaseRAMPrice: aws.BaseRAMPrice,
  945. BaseGPUPrice: aws.BaseGPUPrice,
  946. UsageType: usageType,
  947. }, nil
  948. } else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
  949. strCost := fmt.Sprintf("%f", ri.EffectiveCost)
  950. return &Node{
  951. Cost: strCost,
  952. VCPU: terms.VCpu,
  953. RAM: terms.Memory,
  954. GPU: terms.GPU,
  955. Storage: terms.Storage,
  956. BaseCPUPrice: aws.BaseCPUPrice,
  957. BaseRAMPrice: aws.BaseRAMPrice,
  958. BaseGPUPrice: aws.BaseGPUPrice,
  959. UsageType: usageType,
  960. }, nil
  961. }
  962. var cost string
  963. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  964. if ok {
  965. cost = c.PricePerUnit.USD
  966. } else {
  967. // Check for Chinese pricing before throwing error
  968. c, ok = terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCodeCn+HourlyRateCodeCn]
  969. if ok {
  970. cost = c.PricePerUnit.CNY
  971. } else {
  972. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  973. }
  974. }
  975. return &Node{
  976. Cost: cost,
  977. VCPU: terms.VCpu,
  978. RAM: terms.Memory,
  979. GPU: terms.GPU,
  980. Storage: terms.Storage,
  981. BaseCPUPrice: aws.BaseCPUPrice,
  982. BaseRAMPrice: aws.BaseRAMPrice,
  983. BaseGPUPrice: aws.BaseGPUPrice,
  984. UsageType: usageType,
  985. }, nil
  986. }
  987. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  988. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  989. aws.DownloadPricingDataLock.RLock()
  990. defer aws.DownloadPricingDataLock.RUnlock()
  991. key := k.Features()
  992. usageType := "ondemand"
  993. if aws.isPreemptible(key) {
  994. usageType = PreemptibleType
  995. }
  996. terms, ok := aws.Pricing[key]
  997. if ok {
  998. return aws.createNode(terms, usageType, k)
  999. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  1000. aws.DownloadPricingDataLock.RUnlock()
  1001. err := aws.DownloadPricingData()
  1002. aws.DownloadPricingDataLock.RLock()
  1003. if err != nil {
  1004. return &Node{
  1005. Cost: aws.BaseCPUPrice,
  1006. BaseCPUPrice: aws.BaseCPUPrice,
  1007. BaseRAMPrice: aws.BaseRAMPrice,
  1008. BaseGPUPrice: aws.BaseGPUPrice,
  1009. UsageType: usageType,
  1010. UsesBaseCPUPrice: true,
  1011. }, err
  1012. }
  1013. terms, termsOk := aws.Pricing[key]
  1014. if !termsOk {
  1015. return &Node{
  1016. Cost: aws.BaseCPUPrice,
  1017. BaseCPUPrice: aws.BaseCPUPrice,
  1018. BaseRAMPrice: aws.BaseRAMPrice,
  1019. BaseGPUPrice: aws.BaseGPUPrice,
  1020. UsageType: usageType,
  1021. UsesBaseCPUPrice: true,
  1022. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  1023. }
  1024. return aws.createNode(terms, usageType, k)
  1025. } else { // Fall back to base pricing if we can't find the key. Base pricing is handled at the costmodel level.
  1026. return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
  1027. }
  1028. }
  1029. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  1030. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  1031. defaultClusterName := "AWS Cluster #1"
  1032. c, err := awsProvider.GetConfig()
  1033. if err != nil {
  1034. return nil, err
  1035. }
  1036. remoteEnabled := env.IsRemoteEnabled()
  1037. if c.ClusterName != "" {
  1038. m := make(map[string]string)
  1039. m["name"] = c.ClusterName
  1040. m["provider"] = "AWS"
  1041. m["id"] = env.GetClusterID()
  1042. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  1043. m["provisioner"] = awsProvider.clusterProvisioner
  1044. return m, nil
  1045. }
  1046. makeStructure := func(clusterName string) (map[string]string, error) {
  1047. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  1048. m := make(map[string]string)
  1049. m["name"] = clusterName
  1050. m["provider"] = "AWS"
  1051. m["id"] = env.GetClusterID()
  1052. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  1053. return m, nil
  1054. }
  1055. maybeClusterId := env.GetAWSClusterID()
  1056. if len(maybeClusterId) != 0 {
  1057. return makeStructure(maybeClusterId)
  1058. }
  1059. // TODO: This should be cached, it can take a long time to hit the API
  1060. //provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  1061. //clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  1062. //klog.Infof("nodelist get here %s", time.Now())
  1063. //nodeList := awsProvider.Clientset.GetAllNodes()
  1064. //klog.Infof("nodelist done here %s", time.Now())
  1065. /*for _, n := range nodeList {
  1066. region := ""
  1067. instanceId := ""
  1068. providerId := n.Spec.ProviderID
  1069. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  1070. if matchNum == 1 {
  1071. region = group
  1072. } else if matchNum == 2 {
  1073. instanceId = group
  1074. }
  1075. }
  1076. if len(instanceId) == 0 {
  1077. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  1078. continue
  1079. }
  1080. c := &aws.Config{
  1081. Region: aws.String(region),
  1082. }
  1083. s := session.Must(session.NewSession(c))
  1084. ec2Svc := ec2.New(s)
  1085. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  1086. InstanceIds: []*string{
  1087. aws.String(instanceId),
  1088. },
  1089. })
  1090. if diErr != nil {
  1091. klog.Infof("Error describing instances: %s", diErr)
  1092. continue
  1093. }
  1094. if len(di.Reservations) != 1 {
  1095. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  1096. continue
  1097. }
  1098. res := di.Reservations[0]
  1099. if len(res.Instances) != 1 {
  1100. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  1101. continue
  1102. }
  1103. inst := res.Instances[0]
  1104. for _, tag := range inst.Tags {
  1105. tagKey := *tag.Key
  1106. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  1107. if matchNum != 1 {
  1108. continue
  1109. }
  1110. return makeStructure(group)
  1111. }
  1112. }
  1113. }*/
  1114. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", env.AWSClusterIDEnvVar)
  1115. return makeStructure(defaultClusterName)
  1116. }
  1117. // updates the authentication to the latest values (via config or secret)
  1118. func (aws *AWS) ConfigureAuth() error {
  1119. c, err := aws.Config.GetCustomPricingData()
  1120. if err != nil {
  1121. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  1122. }
  1123. return aws.ConfigureAuthWith(c)
  1124. }
  1125. // updates the authentication to the latest values (via config or secret)
  1126. func (aws *AWS) ConfigureAuthWith(config *CustomPricing) error {
  1127. accessKeyID, accessKeySecret := aws.getAWSAuth(false, config)
  1128. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1129. err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
  1130. if err != nil {
  1131. return err
  1132. }
  1133. err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
  1134. if err != nil {
  1135. return err
  1136. }
  1137. }
  1138. return nil
  1139. }
  1140. // Gets the aws key id and secret
  1141. func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
  1142. if aws.ServiceAccountChecks == nil { // safety in case checks don't exist
  1143. aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  1144. }
  1145. // 1. Check config values first (set from frontend UI)
  1146. if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
  1147. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1148. Message: "AWS ServiceKey exists",
  1149. Status: true,
  1150. }
  1151. return cp.ServiceKeyName, cp.ServiceKeySecret
  1152. }
  1153. // 2. Check for secret
  1154. s, _ := aws.loadAWSAuthSecret(forceReload)
  1155. if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
  1156. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1157. Message: "AWS ServiceKey exists",
  1158. Status: true,
  1159. }
  1160. return s.AccessKeyID, s.SecretAccessKey
  1161. }
  1162. // 3. Fall back to env vars
  1163. if env.GetAWSAccessKeyID() == "" || env.GetAWSAccessKeyID() == "" {
  1164. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1165. Message: "AWS ServiceKey exists",
  1166. Status: false,
  1167. }
  1168. } else {
  1169. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1170. Message: "AWS ServiceKey exists",
  1171. Status: true,
  1172. }
  1173. }
  1174. return env.GetAWSAccessKeyID(), env.GetAWSAccessKeySecret()
  1175. }
  1176. // Load once and cache the result (even on failure). This is an install time secret, so
  1177. // we don't expect the secret to change. If it does, however, we can force reload using
  1178. // the input parameter.
  1179. func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
  1180. if !force && loadedAWSSecret {
  1181. return awsSecret, nil
  1182. }
  1183. loadedAWSSecret = true
  1184. exists, err := util.FileExists(authSecretPath)
  1185. if !exists || err != nil {
  1186. return nil, fmt.Errorf("Failed to locate service account file: %s", authSecretPath)
  1187. }
  1188. result, err := ioutil.ReadFile(authSecretPath)
  1189. if err != nil {
  1190. return nil, err
  1191. }
  1192. var ak AWSAccessKey
  1193. err = json.Unmarshal(result, &ak)
  1194. if err != nil {
  1195. return nil, err
  1196. }
  1197. awsSecret = &ak
  1198. return awsSecret, nil
  1199. }
  1200. func getClusterConfig(ccFile string) (map[string]string, error) {
  1201. clusterConfig, err := os.Open(ccFile)
  1202. if err != nil {
  1203. return nil, err
  1204. }
  1205. defer clusterConfig.Close()
  1206. b, err := ioutil.ReadAll(clusterConfig)
  1207. if err != nil {
  1208. return nil, err
  1209. }
  1210. var clusterConf map[string]string
  1211. err = json.Unmarshal([]byte(b), &clusterConf)
  1212. if err != nil {
  1213. return nil, err
  1214. }
  1215. return clusterConf, nil
  1216. }
  1217. func (a *AWS) getAddressesForRegion(region string) (*ec2.DescribeAddressesOutput, error) {
  1218. sess, err := session.NewSession(&aws.Config{
  1219. Region: aws.String(region),
  1220. Credentials: credentials.NewEnvCredentials(),
  1221. })
  1222. if err != nil {
  1223. return nil, err
  1224. }
  1225. ec2Svc := ec2.New(sess)
  1226. return ec2Svc.DescribeAddresses(&ec2.DescribeAddressesInput{})
  1227. }
  1228. func (a *AWS) GetAddresses() ([]byte, error) {
  1229. a.ConfigureAuth() // load authentication data into env vars
  1230. addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
  1231. errorCh := make(chan error, len(awsRegions))
  1232. var wg sync.WaitGroup
  1233. wg.Add(len(awsRegions))
  1234. // Get volumes from each AWS region
  1235. for _, r := range awsRegions {
  1236. // Fetch IP address response and send results and errors to their
  1237. // respective channels
  1238. go func(region string) {
  1239. defer wg.Done()
  1240. defer errors.HandlePanic()
  1241. // Query for first page of volume results
  1242. resp, err := a.getAddressesForRegion(region)
  1243. if err != nil {
  1244. if aerr, ok := err.(awserr.Error); ok {
  1245. switch aerr.Code() {
  1246. default:
  1247. errorCh <- aerr
  1248. }
  1249. return
  1250. } else {
  1251. errorCh <- err
  1252. return
  1253. }
  1254. }
  1255. addressCh <- resp
  1256. }(r)
  1257. }
  1258. // Close the result channels after everything has been sent
  1259. go func() {
  1260. defer errors.HandlePanic()
  1261. wg.Wait()
  1262. close(errorCh)
  1263. close(addressCh)
  1264. }()
  1265. addresses := []*ec2.Address{}
  1266. for adds := range addressCh {
  1267. addresses = append(addresses, adds.Addresses...)
  1268. }
  1269. errors := []error{}
  1270. for err := range errorCh {
  1271. log.DedupedWarningf(5, "unable to get addresses: %s", err)
  1272. errors = append(errors, err)
  1273. }
  1274. // Return error if no addresses are returned
  1275. if len(errors) > 0 && len(addresses) == 0 {
  1276. return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errors), errors)
  1277. }
  1278. // Format the response this way to match the JSON-encoded formatting of a single response
  1279. // from DescribeAddresss, so that consumers can always expect AWS disk responses to have
  1280. // a "Addresss" key at the top level.
  1281. return json.Marshal(map[string][]*ec2.Address{
  1282. "Addresses": addresses,
  1283. })
  1284. }
  1285. func (a *AWS) getDisksForRegion(region string, maxResults int64, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
  1286. sess, err := session.NewSession(&aws.Config{
  1287. Region: aws.String(region),
  1288. Credentials: credentials.NewEnvCredentials(),
  1289. })
  1290. if err != nil {
  1291. return nil, err
  1292. }
  1293. ec2Svc := ec2.New(sess)
  1294. return ec2Svc.DescribeVolumes(&ec2.DescribeVolumesInput{
  1295. MaxResults: &maxResults,
  1296. NextToken: nextToken,
  1297. })
  1298. }
  1299. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  1300. func (a *AWS) GetDisks() ([]byte, error) {
  1301. a.ConfigureAuth() // load authentication data into env vars
  1302. volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
  1303. errorCh := make(chan error, len(awsRegions))
  1304. var wg sync.WaitGroup
  1305. wg.Add(len(awsRegions))
  1306. // Get volumes from each AWS region
  1307. for _, r := range awsRegions {
  1308. // Fetch volume response and send results and errors to their
  1309. // respective channels
  1310. go func(region string) {
  1311. defer wg.Done()
  1312. defer errors.HandlePanic()
  1313. // Query for first page of volume results
  1314. resp, err := a.getDisksForRegion(region, 1000, nil)
  1315. if err != nil {
  1316. if aerr, ok := err.(awserr.Error); ok {
  1317. switch aerr.Code() {
  1318. default:
  1319. errorCh <- aerr
  1320. }
  1321. return
  1322. } else {
  1323. errorCh <- err
  1324. return
  1325. }
  1326. }
  1327. volumeCh <- resp
  1328. // A NextToken indicates more pages of results. Keep querying
  1329. // until all pages are retrieved.
  1330. for resp.NextToken != nil {
  1331. resp, err = a.getDisksForRegion(region, 100, resp.NextToken)
  1332. if err != nil {
  1333. if aerr, ok := err.(awserr.Error); ok {
  1334. switch aerr.Code() {
  1335. default:
  1336. errorCh <- aerr
  1337. }
  1338. return
  1339. } else {
  1340. errorCh <- err
  1341. return
  1342. }
  1343. }
  1344. volumeCh <- resp
  1345. }
  1346. }(r)
  1347. }
  1348. // Close the result channels after everything has been sent
  1349. go func() {
  1350. defer errors.HandlePanic()
  1351. wg.Wait()
  1352. close(errorCh)
  1353. close(volumeCh)
  1354. }()
  1355. volumes := []*ec2.Volume{}
  1356. for vols := range volumeCh {
  1357. volumes = append(volumes, vols.Volumes...)
  1358. }
  1359. errors := []error{}
  1360. for err := range errorCh {
  1361. log.DedupedWarningf(5, "unable to get disks: %s", err)
  1362. errors = append(errors, err)
  1363. }
  1364. // Return error if no volumes are returned
  1365. if len(errors) > 0 && len(volumes) == 0 {
  1366. return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errors), errors)
  1367. }
  1368. // Format the response this way to match the JSON-encoded formatting of a single response
  1369. // from DescribeVolumes, so that consumers can always expect AWS disk responses to have
  1370. // a "Volumes" key at the top level.
  1371. return json.Marshal(map[string][]*ec2.Volume{
  1372. "Volumes": volumes,
  1373. })
  1374. }
  1375. func generateAWSGroupBy(lastIdx int) string {
  1376. sequence := []string{}
  1377. for i := 1; i < lastIdx+1; i++ {
  1378. sequence = append(sequence, strconv.Itoa(i))
  1379. }
  1380. return strings.Join(sequence, ",")
  1381. }
  1382. func (a *AWS) QueryAthenaPaginated(query string) (*athena.GetQueryResultsInput, *athena.Athena, error) {
  1383. customPricing, err := a.GetConfig()
  1384. if err != nil {
  1385. return nil, nil, err
  1386. }
  1387. a.ConfigureAuthWith(customPricing)
  1388. region := aws.String(customPricing.AthenaRegion)
  1389. resultsBucket := customPricing.AthenaBucketName
  1390. database := customPricing.AthenaDatabase
  1391. c := &aws.Config{
  1392. Region: region,
  1393. STSRegionalEndpoint: endpoints.RegionalSTSEndpoint,
  1394. }
  1395. s := session.Must(session.NewSession(c))
  1396. svc := athena.New(s)
  1397. if customPricing.MasterPayerARN != "" {
  1398. creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
  1399. svc = athena.New(s, &aws.Config{
  1400. Region: region,
  1401. Credentials: creds,
  1402. })
  1403. }
  1404. var e athena.StartQueryExecutionInput
  1405. var r athena.ResultConfiguration
  1406. r.SetOutputLocation(resultsBucket)
  1407. e.SetResultConfiguration(&r)
  1408. e.SetQueryString(query)
  1409. var q athena.QueryExecutionContext
  1410. q.SetDatabase(database)
  1411. e.SetQueryExecutionContext(&q)
  1412. res, err := svc.StartQueryExecution(&e)
  1413. if err != nil {
  1414. return nil, svc, err
  1415. }
  1416. klog.V(2).Infof("StartQueryExecution result:")
  1417. klog.V(2).Infof(res.GoString())
  1418. var qri athena.GetQueryExecutionInput
  1419. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1420. var qrop *athena.GetQueryExecutionOutput
  1421. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1422. for {
  1423. qrop, err = svc.GetQueryExecution(&qri)
  1424. if err != nil {
  1425. return nil, svc, err
  1426. }
  1427. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1428. break
  1429. }
  1430. time.Sleep(duration)
  1431. }
  1432. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1433. var ip athena.GetQueryResultsInput
  1434. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1435. return &ip, svc, nil
  1436. } else {
  1437. return nil, svc, fmt.Errorf("No results available for %s", query)
  1438. }
  1439. }
  1440. func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
  1441. customPricing, err := a.GetConfig()
  1442. if err != nil {
  1443. return nil, err
  1444. }
  1445. a.ConfigureAuthWith(customPricing) // load aws authentication from configuration or secret
  1446. region := aws.String(customPricing.AthenaRegion)
  1447. resultsBucket := customPricing.AthenaBucketName
  1448. database := customPricing.AthenaDatabase
  1449. c := &aws.Config{
  1450. Region: region,
  1451. STSRegionalEndpoint: endpoints.RegionalSTSEndpoint,
  1452. }
  1453. s := session.Must(session.NewSession(c))
  1454. svc := athena.New(s)
  1455. if customPricing.MasterPayerARN != "" {
  1456. creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
  1457. svc = athena.New(s, &aws.Config{
  1458. Region: region,
  1459. Credentials: creds,
  1460. })
  1461. }
  1462. var e athena.StartQueryExecutionInput
  1463. var r athena.ResultConfiguration
  1464. r.SetOutputLocation(resultsBucket)
  1465. e.SetResultConfiguration(&r)
  1466. e.SetQueryString(query)
  1467. var q athena.QueryExecutionContext
  1468. q.SetDatabase(database)
  1469. e.SetQueryExecutionContext(&q)
  1470. res, err := svc.StartQueryExecution(&e)
  1471. if err != nil {
  1472. return nil, err
  1473. }
  1474. klog.V(2).Infof("StartQueryExecution result:")
  1475. klog.V(2).Infof(res.GoString())
  1476. var qri athena.GetQueryExecutionInput
  1477. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1478. var qrop *athena.GetQueryExecutionOutput
  1479. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1480. for {
  1481. qrop, err = svc.GetQueryExecution(&qri)
  1482. if err != nil {
  1483. return nil, err
  1484. }
  1485. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1486. break
  1487. }
  1488. time.Sleep(duration)
  1489. }
  1490. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1491. var ip athena.GetQueryResultsInput
  1492. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1493. return svc.GetQueryResults(&ip)
  1494. } else {
  1495. return nil, fmt.Errorf("No results available for %s", query)
  1496. }
  1497. }
  1498. type SavingsPlanData struct {
  1499. ResourceID string
  1500. EffectiveCost float64
  1501. SavingsPlanARN string
  1502. MostRecentDate string
  1503. }
  1504. func (a *AWS) GetSavingsPlanDataFromAthena() error {
  1505. cfg, err := a.GetConfig()
  1506. if err != nil {
  1507. return err
  1508. }
  1509. if cfg.AthenaBucketName == "" {
  1510. return fmt.Errorf("No Athena Bucket configured")
  1511. }
  1512. if a.SavingsPlanDataByInstanceID == nil {
  1513. a.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData)
  1514. }
  1515. tNow := time.Now()
  1516. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1517. start := tOneDayAgo.Format("2006-01-02")
  1518. end := tNow.Format("2006-01-02")
  1519. // Use Savings Plan Effective Rate as an estimation for cost, assuming the 1h most recent period got a fully loaded savings plan.
  1520. //
  1521. q := `SELECT
  1522. line_item_usage_start_date,
  1523. savings_plan_savings_plan_a_r_n,
  1524. line_item_resource_id,
  1525. savings_plan_savings_plan_rate
  1526. FROM %s as cost_data
  1527. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1528. AND line_item_line_item_type = 'SavingsPlanCoveredUsage' ORDER BY
  1529. line_item_usage_start_date DESC`
  1530. page := 0
  1531. processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
  1532. a.SavingsPlanDataLock.Lock()
  1533. a.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData) // Clean out the old data and only report a savingsplan price if its in the most recent run.
  1534. mostRecentDate := ""
  1535. iter := op.ResultSet.Rows
  1536. if page == 0 && len(iter) > 0 {
  1537. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1538. }
  1539. page++
  1540. for _, r := range iter {
  1541. d := *r.Data[0].VarCharValue
  1542. if mostRecentDate == "" {
  1543. mostRecentDate = d
  1544. } else if mostRecentDate != d { // Get all most recent assignments
  1545. break
  1546. }
  1547. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1548. if err != nil {
  1549. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1550. }
  1551. r := &SavingsPlanData{
  1552. ResourceID: *r.Data[2].VarCharValue,
  1553. EffectiveCost: cost,
  1554. SavingsPlanARN: *r.Data[1].VarCharValue,
  1555. MostRecentDate: d,
  1556. }
  1557. a.SavingsPlanDataByInstanceID[r.ResourceID] = r
  1558. }
  1559. klog.V(1).Infof("Found %d savings plan applied instances", len(a.SavingsPlanDataByInstanceID))
  1560. for k, r := range a.SavingsPlanDataByInstanceID {
  1561. log.DedupedInfof(5, "Savings Plan Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1562. }
  1563. a.SavingsPlanDataLock.Unlock()
  1564. return true
  1565. }
  1566. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1567. klog.V(3).Infof("Running Query: %s", query)
  1568. ip, svc, err := a.QueryAthenaPaginated(query)
  1569. if err != nil {
  1570. return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
  1571. }
  1572. athenaErr := svc.GetQueryResultsPages(ip, processResults)
  1573. if athenaErr != nil {
  1574. return athenaErr
  1575. }
  1576. return nil
  1577. }
  1578. type RIData struct {
  1579. ResourceID string
  1580. EffectiveCost float64
  1581. ReservationARN string
  1582. MostRecentDate string
  1583. }
  1584. func (a *AWS) GetReservationDataFromAthena() error {
  1585. cfg, err := a.GetConfig()
  1586. if err != nil {
  1587. return err
  1588. }
  1589. if cfg.AthenaBucketName == "" {
  1590. return fmt.Errorf("No Athena Bucket configured")
  1591. }
  1592. // Query for all column names in advance in order to validate configured
  1593. // label columns
  1594. columns, _ := a.ShowAthenaColumns()
  1595. if columns["reservation_reservation_a_r_n"] && columns["reservation_effective_cost"] {
  1596. if a.RIPricingByInstanceID == nil {
  1597. a.RIPricingByInstanceID = make(map[string]*RIData)
  1598. }
  1599. tNow := time.Now()
  1600. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1601. start := tOneDayAgo.Format("2006-01-02")
  1602. end := tNow.Format("2006-01-02")
  1603. q := `SELECT
  1604. line_item_usage_start_date,
  1605. reservation_reservation_a_r_n,
  1606. line_item_resource_id,
  1607. reservation_effective_cost
  1608. FROM %s as cost_data
  1609. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1610. AND reservation_reservation_a_r_n <> '' ORDER BY
  1611. line_item_usage_start_date DESC`
  1612. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1613. op, err := a.QueryAthenaBillingData(query)
  1614. if err != nil {
  1615. a.RIPricingStatus = err.Error()
  1616. return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
  1617. }
  1618. a.RIPricingStatus = ""
  1619. klog.Infof("Fetching RI data...")
  1620. if len(op.ResultSet.Rows) > 1 {
  1621. a.RIDataLock.Lock()
  1622. mostRecentDate := ""
  1623. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1624. d := *r.Data[0].VarCharValue
  1625. if mostRecentDate == "" {
  1626. mostRecentDate = d
  1627. } else if mostRecentDate != d { // Get all most recent assignments
  1628. break
  1629. }
  1630. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1631. if err != nil {
  1632. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1633. }
  1634. r := &RIData{
  1635. ResourceID: *r.Data[2].VarCharValue,
  1636. EffectiveCost: cost,
  1637. ReservationARN: *r.Data[1].VarCharValue,
  1638. MostRecentDate: d,
  1639. }
  1640. a.RIPricingByInstanceID[r.ResourceID] = r
  1641. }
  1642. klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
  1643. for k, r := range a.RIPricingByInstanceID {
  1644. log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1645. }
  1646. a.RIDataLock.Unlock()
  1647. } else {
  1648. klog.Infof("No reserved instance data found")
  1649. }
  1650. } else {
  1651. klog.Infof("No reserved data available in Athena")
  1652. a.RIPricingStatus = ""
  1653. }
  1654. return nil
  1655. }
  1656. // ShowAthenaColumns returns a list of the names of all columns in the configured
  1657. // Athena tables
  1658. func (aws *AWS) ShowAthenaColumns() (map[string]bool, error) {
  1659. columnSet := map[string]bool{}
  1660. // Configure Athena query
  1661. cfg, err := aws.GetConfig()
  1662. if err != nil {
  1663. return nil, err
  1664. }
  1665. if cfg.AthenaTable == "" {
  1666. return nil, fmt.Errorf("AthenaTable not configured")
  1667. }
  1668. if cfg.AthenaBucketName == "" {
  1669. return nil, fmt.Errorf("AthenaBucketName not configured")
  1670. }
  1671. q := `SHOW COLUMNS IN %s`
  1672. query := fmt.Sprintf(q, cfg.AthenaTable)
  1673. results, svc, err := aws.QueryAthenaPaginated(query)
  1674. columns := []string{}
  1675. pageNum := 0
  1676. athenaErr := svc.GetQueryResultsPages(results, func(page *athena.GetQueryResultsOutput, lastpage bool) bool {
  1677. for _, row := range page.ResultSet.Rows {
  1678. columns = append(columns, *row.Data[0].VarCharValue)
  1679. }
  1680. pageNum++
  1681. return true
  1682. })
  1683. if athenaErr != nil {
  1684. log.Warningf("Error getting Athena columns: %s", err)
  1685. return columnSet, athenaErr
  1686. }
  1687. for _, col := range columns {
  1688. columnSet[col] = true
  1689. }
  1690. return columnSet, nil
  1691. }
  1692. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  1693. // "start" and "end" are dates of the format YYYY-MM-DD
  1694. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  1695. func (a *AWS) ExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string, crossCluster bool) ([]*OutOfClusterAllocation, error) {
  1696. customPricing, err := a.GetConfig()
  1697. if err != nil {
  1698. return nil, err
  1699. }
  1700. formattedAggregators := []string{}
  1701. for _, agg := range aggregators {
  1702. aggregator_column_name := "resource_tags_user_" + agg
  1703. aggregator_column_name = cloudutil.ConvertToGlueColumnFormat(aggregator_column_name)
  1704. formattedAggregators = append(formattedAggregators, aggregator_column_name)
  1705. }
  1706. aggregatorNames := strings.Join(formattedAggregators, ",")
  1707. aggregatorOr := strings.Join(formattedAggregators, " <> '' OR ")
  1708. aggregatorOr = aggregatorOr + " <> ''"
  1709. filter_column_name := "resource_tags_user_" + filterType
  1710. filter_column_name = cloudutil.ConvertToGlueColumnFormat(filter_column_name)
  1711. var query string
  1712. var lastIdx int
  1713. if filterType != "kubernetes_" { // This gets appended upstream and is equivalent to no filter.
  1714. lastIdx = len(formattedAggregators) + 3
  1715. groupby := generateAWSGroupBy(lastIdx)
  1716. query = fmt.Sprintf(`SELECT
  1717. CAST(line_item_usage_start_date AS DATE) as start_date,
  1718. %s,
  1719. line_item_product_code,
  1720. %s,
  1721. SUM(line_item_blended_cost) as blended_cost
  1722. FROM %s as cost_data
  1723. WHERE (%s='%s') AND line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
  1724. GROUP BY %s`, aggregatorNames, filter_column_name, customPricing.AthenaTable, filter_column_name, filterValue, start, end, aggregatorOr, groupby)
  1725. } else {
  1726. lastIdx = len(formattedAggregators) + 2
  1727. groupby := generateAWSGroupBy(lastIdx)
  1728. query = fmt.Sprintf(`SELECT
  1729. CAST(line_item_usage_start_date AS DATE) as start_date,
  1730. %s,
  1731. line_item_product_code,
  1732. SUM(line_item_blended_cost) as blended_cost
  1733. FROM %s as cost_data
  1734. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
  1735. GROUP BY %s`, aggregatorNames, customPricing.AthenaTable, start, end, aggregatorOr, groupby)
  1736. }
  1737. var oocAllocs []*OutOfClusterAllocation
  1738. page := 0
  1739. processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
  1740. iter := op.ResultSet.Rows
  1741. if page == 0 && len(iter) > 0 {
  1742. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1743. }
  1744. page++
  1745. for _, r := range iter {
  1746. cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
  1747. if err != nil {
  1748. klog.Infof("Error converting cost `%s` from float ", *r.Data[lastIdx].VarCharValue)
  1749. }
  1750. environment := ""
  1751. for _, d := range r.Data[1 : len(formattedAggregators)+1] {
  1752. if *d.VarCharValue != "" {
  1753. environment = *d.VarCharValue // just set to the first nonempty match
  1754. }
  1755. break
  1756. }
  1757. ooc := &OutOfClusterAllocation{
  1758. Aggregator: strings.Join(aggregators, ","),
  1759. Environment: environment,
  1760. Service: *r.Data[len(formattedAggregators)+1].VarCharValue,
  1761. Cost: cost,
  1762. }
  1763. oocAllocs = append(oocAllocs, ooc)
  1764. }
  1765. return true
  1766. }
  1767. // Query for all column names in advance in order to validate configured
  1768. // label columns
  1769. columns, _ := a.ShowAthenaColumns()
  1770. // Check for all aggregators being formatted into the query
  1771. containsColumns := true
  1772. for _, agg := range formattedAggregators {
  1773. if columns[agg] != true {
  1774. containsColumns = false
  1775. klog.Warningf("Athena missing column: %s", agg)
  1776. }
  1777. }
  1778. if containsColumns {
  1779. klog.V(3).Infof("Running Query: %s", query)
  1780. ip, svc, _ := a.QueryAthenaPaginated(query)
  1781. athenaErr := svc.GetQueryResultsPages(ip, processResults)
  1782. if athenaErr != nil {
  1783. klog.Infof("RETURNING ATHENA ERROR")
  1784. return nil, athenaErr
  1785. }
  1786. if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
  1787. gcp, err := NewCrossClusterProvider("gcp", "aws.json", a.Clientset)
  1788. if err != nil {
  1789. klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
  1790. }
  1791. gcpOOC, err := gcp.ExternalAllocations(start, end, aggregators, filterType, filterValue, true)
  1792. if err != nil {
  1793. klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
  1794. }
  1795. oocAllocs = append(oocAllocs, gcpOOC...)
  1796. }
  1797. } else {
  1798. klog.Infof("External Allocations: Athena Query skipped due to missing columns")
  1799. }
  1800. return oocAllocs, nil
  1801. }
  1802. // QuerySQL can query a properly configured Athena database.
  1803. // Used to fetch billing data.
  1804. // Requires a json config in /var/configs with key region, output, and database.
  1805. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  1806. customPricing, err := a.GetConfig()
  1807. if err != nil {
  1808. return nil, err
  1809. }
  1810. a.ConfigureAuthWith(customPricing) // load aws authentication from configuration or secret
  1811. athenaConfigs, err := os.Open("/var/configs/athena.json")
  1812. if err != nil {
  1813. return nil, err
  1814. }
  1815. defer athenaConfigs.Close()
  1816. b, err := ioutil.ReadAll(athenaConfigs)
  1817. if err != nil {
  1818. return nil, err
  1819. }
  1820. var athenaConf map[string]string
  1821. json.Unmarshal([]byte(b), &athenaConf)
  1822. region := aws.String(customPricing.AthenaRegion)
  1823. resultsBucket := customPricing.AthenaBucketName
  1824. database := customPricing.AthenaDatabase
  1825. c := &aws.Config{
  1826. Region: region,
  1827. }
  1828. s := session.Must(session.NewSession(c))
  1829. svc := athena.New(s)
  1830. var e athena.StartQueryExecutionInput
  1831. var r athena.ResultConfiguration
  1832. r.SetOutputLocation(resultsBucket)
  1833. e.SetResultConfiguration(&r)
  1834. e.SetQueryString(query)
  1835. var q athena.QueryExecutionContext
  1836. q.SetDatabase(database)
  1837. e.SetQueryExecutionContext(&q)
  1838. res, err := svc.StartQueryExecution(&e)
  1839. if err != nil {
  1840. return nil, err
  1841. }
  1842. klog.V(2).Infof("StartQueryExecution result:")
  1843. klog.V(2).Infof(res.GoString())
  1844. var qri athena.GetQueryExecutionInput
  1845. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1846. var qrop *athena.GetQueryExecutionOutput
  1847. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1848. for {
  1849. qrop, err = svc.GetQueryExecution(&qri)
  1850. if err != nil {
  1851. return nil, err
  1852. }
  1853. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1854. break
  1855. }
  1856. time.Sleep(duration)
  1857. }
  1858. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1859. var ip athena.GetQueryResultsInput
  1860. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1861. op, err := svc.GetQueryResults(&ip)
  1862. if err != nil {
  1863. return nil, err
  1864. }
  1865. b, err := json.Marshal(op.ResultSet)
  1866. if err != nil {
  1867. return nil, err
  1868. }
  1869. return b, nil
  1870. }
  1871. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  1872. }
  1873. type spotInfo struct {
  1874. Timestamp string `csv:"Timestamp"`
  1875. UsageType string `csv:"UsageType"`
  1876. Operation string `csv:"Operation"`
  1877. InstanceID string `csv:"InstanceID"`
  1878. MyBidID string `csv:"MyBidID"`
  1879. MyMaxPrice string `csv:"MyMaxPrice"`
  1880. MarketPrice string `csv:"MarketPrice"`
  1881. Charge string `csv:"Charge"`
  1882. Version string `csv:"Version"`
  1883. }
  1884. type fnames []*string
  1885. func (f fnames) Len() int {
  1886. return len(f)
  1887. }
  1888. func (f fnames) Swap(i, j int) {
  1889. f[i], f[j] = f[j], f[i]
  1890. }
  1891. func (f fnames) Less(i, j int) bool {
  1892. key1 := strings.Split(*f[i], ".")
  1893. key2 := strings.Split(*f[j], ".")
  1894. t1, err := time.Parse("2006-01-02-15", key1[1])
  1895. if err != nil {
  1896. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  1897. return false
  1898. }
  1899. t2, err := time.Parse("2006-01-02-15", key2[1])
  1900. if err != nil {
  1901. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  1902. return false
  1903. }
  1904. return t1.Before(t2)
  1905. }
  1906. func (a *AWS) parseSpotData(bucket string, prefix string, projectID string, region string) (map[string]*spotInfo, error) {
  1907. if a.ServiceAccountChecks == nil { // Set up checks to store error/success states
  1908. a.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  1909. }
  1910. a.ConfigureAuth() // configure aws api authentication by setting env vars
  1911. s3Prefix := projectID
  1912. if len(prefix) != 0 {
  1913. s3Prefix = prefix + "/" + s3Prefix
  1914. }
  1915. c := aws.NewConfig().WithRegion(region)
  1916. s := session.Must(session.NewSession(c))
  1917. s3Svc := s3.New(s)
  1918. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  1919. tNow := time.Now()
  1920. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1921. ls := &s3.ListObjectsInput{
  1922. Bucket: aws.String(bucket),
  1923. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1924. }
  1925. ls2 := &s3.ListObjectsInput{
  1926. Bucket: aws.String(bucket),
  1927. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1928. }
  1929. lso, err := s3Svc.ListObjects(ls)
  1930. if err != nil {
  1931. a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
  1932. Message: "Bucket List Permissions Available",
  1933. Status: false,
  1934. AdditionalInfo: err.Error(),
  1935. }
  1936. return nil, err
  1937. } else {
  1938. a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
  1939. Message: "Bucket List Permissions Available",
  1940. Status: true,
  1941. }
  1942. }
  1943. lsoLen := len(lso.Contents)
  1944. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1945. if lsoLen == 0 {
  1946. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1947. }
  1948. lso2, err := s3Svc.ListObjects(ls2)
  1949. if err != nil {
  1950. return nil, err
  1951. }
  1952. lso2Len := len(lso2.Contents)
  1953. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1954. if lso2Len == 0 {
  1955. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1956. }
  1957. // TODO: Worth it to use LastModifiedDate to determine if we should reparse the spot data?
  1958. var keys []*string
  1959. for _, obj := range lso.Contents {
  1960. keys = append(keys, obj.Key)
  1961. }
  1962. for _, obj := range lso2.Contents {
  1963. keys = append(keys, obj.Key)
  1964. }
  1965. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1966. header, err := csvutil.Header(spotInfo{}, "csv")
  1967. if err != nil {
  1968. return nil, err
  1969. }
  1970. fieldsPerRecord := len(header)
  1971. spots := make(map[string]*spotInfo)
  1972. for _, key := range keys {
  1973. getObj := &s3.GetObjectInput{
  1974. Bucket: aws.String(bucket),
  1975. Key: key,
  1976. }
  1977. buf := aws.NewWriteAtBuffer([]byte{})
  1978. _, err := downloader.Download(buf, getObj)
  1979. if err != nil {
  1980. a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
  1981. Message: "Object Get Permissions Available",
  1982. Status: false,
  1983. AdditionalInfo: err.Error(),
  1984. }
  1985. return nil, err
  1986. } else {
  1987. a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
  1988. Message: "Object Get Permissions Available",
  1989. Status: true,
  1990. }
  1991. }
  1992. r := bytes.NewReader(buf.Bytes())
  1993. gr, err := gzip.NewReader(r)
  1994. if err != nil {
  1995. return nil, err
  1996. }
  1997. csvReader := csv.NewReader(gr)
  1998. csvReader.Comma = '\t'
  1999. csvReader.FieldsPerRecord = fieldsPerRecord
  2000. dec, err := csvutil.NewDecoder(csvReader, header...)
  2001. if err != nil {
  2002. return nil, err
  2003. }
  2004. var foundVersion string
  2005. for {
  2006. spot := spotInfo{}
  2007. err := dec.Decode(&spot)
  2008. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  2009. if err == io.EOF {
  2010. break
  2011. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  2012. rec := dec.Record()
  2013. // the first two "Record()" will be the comment lines
  2014. // and they show up as len() == 1
  2015. // the first of which is "#Version"
  2016. // the second of which is "#Fields: "
  2017. if len(rec) != 1 {
  2018. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  2019. continue
  2020. }
  2021. if len(foundVersion) == 0 {
  2022. spotFeedVersion := rec[0]
  2023. klog.V(4).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  2024. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  2025. if matches != nil {
  2026. foundVersion = matches[1]
  2027. if foundVersion != supportedSpotFeedVersion {
  2028. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  2029. break
  2030. }
  2031. }
  2032. continue
  2033. } else if strings.Index(rec[0], "#") == 0 {
  2034. continue
  2035. } else {
  2036. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  2037. continue
  2038. }
  2039. } else if err != nil {
  2040. klog.V(2).Infof("Error during spot info decode: %+v", err)
  2041. continue
  2042. }
  2043. log.DedupedInfof(5, "Found spot info for: %s", spot.InstanceID)
  2044. spots[spot.InstanceID] = &spot
  2045. }
  2046. gr.Close()
  2047. }
  2048. return spots, nil
  2049. }
  2050. func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  2051. }
  2052. func (a *AWS) ServiceAccountStatus() *ServiceAccountStatus {
  2053. checks := []*ServiceAccountCheck{}
  2054. for _, v := range a.ServiceAccountChecks {
  2055. checks = append(checks, v)
  2056. }
  2057. return &ServiceAccountStatus{
  2058. Checks: checks,
  2059. }
  2060. }
  2061. func (aws *AWS) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  2062. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  2063. }