2
0

awsprovider.go 60 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "encoding/csv"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "os"
  12. "regexp"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "github.com/aws/aws-sdk-go/aws/endpoints"
  18. "k8s.io/klog"
  19. "github.com/kubecost/cost-model/pkg/clustercache"
  20. "github.com/kubecost/cost-model/pkg/env"
  21. "github.com/kubecost/cost-model/pkg/errors"
  22. "github.com/kubecost/cost-model/pkg/log"
  23. "github.com/kubecost/cost-model/pkg/util"
  24. "github.com/kubecost/cost-model/pkg/util/fileutil"
  25. "github.com/kubecost/cost-model/pkg/util/json"
  26. "github.com/aws/aws-sdk-go/aws"
  27. "github.com/aws/aws-sdk-go/aws/awserr"
  28. "github.com/aws/aws-sdk-go/aws/credentials"
  29. "github.com/aws/aws-sdk-go/aws/credentials/stscreds"
  30. "github.com/aws/aws-sdk-go/aws/session"
  31. "github.com/aws/aws-sdk-go/service/athena"
  32. "github.com/aws/aws-sdk-go/service/ec2"
  33. "github.com/aws/aws-sdk-go/service/s3"
  34. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  35. awsV2 "github.com/aws/aws-sdk-go-v2/aws"
  36. "github.com/jszwec/csvutil"
  37. v1 "k8s.io/api/core/v1"
  38. )
  39. const supportedSpotFeedVersion = "1"
  40. const SpotInfoUpdateType = "spotinfo"
  41. const AthenaInfoUpdateType = "athenainfo"
  42. const PreemptibleType = "preemptible"
  43. const APIPricingSource = "Public API"
  44. const SpotPricingSource = "Spot Data Feed"
  45. const ReservedInstancePricingSource = "Savings Plan, Reserved Instance, and Out-Of-Cluster"
  46. func (aws *AWS) PricingSourceStatus() map[string]*PricingSource {
  47. sources := make(map[string]*PricingSource)
  48. sps := &PricingSource{
  49. Name: SpotPricingSource,
  50. }
  51. sps.Error = ""
  52. if aws.SpotPricingError != nil {
  53. sps.Error = aws.SpotPricingError.Error()
  54. }
  55. if sps.Error != "" {
  56. sps.Available = false
  57. } else if len(aws.SpotPricingByInstanceID) > 0 {
  58. sps.Available = true
  59. } else {
  60. sps.Error = "No spot instances detected"
  61. }
  62. sources[SpotPricingSource] = sps
  63. rps := &PricingSource{
  64. Name: ReservedInstancePricingSource,
  65. }
  66. rps.Error = ""
  67. if aws.RIPricingError != nil {
  68. rps.Error = aws.RIPricingError.Error()
  69. }
  70. if rps.Error != "" {
  71. rps.Available = false
  72. } else {
  73. rps.Available = true
  74. }
  75. sources[ReservedInstancePricingSource] = rps
  76. return sources
  77. }
  78. // How often spot data is refreshed
  79. const SpotRefreshDuration = 15 * time.Minute
  80. const defaultConfigPath = "/var/configs/"
  81. var awsRegions = []string{
  82. "us-east-2",
  83. "us-east-1",
  84. "us-west-1",
  85. "us-west-2",
  86. "ap-east-1",
  87. "ap-south-1",
  88. "ap-northeast-3",
  89. "ap-northeast-2",
  90. "ap-southeast-1",
  91. "ap-southeast-2",
  92. "ap-northeast-1",
  93. "ca-central-1",
  94. "cn-north-1",
  95. "cn-northwest-1",
  96. "eu-central-1",
  97. "eu-west-1",
  98. "eu-west-2",
  99. "eu-west-3",
  100. "eu-north-1",
  101. "me-south-1",
  102. "sa-east-1",
  103. "us-gov-east-1",
  104. "us-gov-west-1",
  105. }
  106. // AWS represents an Amazon Provider
  107. type AWS struct {
  108. Pricing map[string]*AWSProductTerms
  109. SpotPricingByInstanceID map[string]*spotInfo
  110. SpotPricingUpdatedAt *time.Time
  111. SpotRefreshRunning bool
  112. SpotPricingLock sync.RWMutex
  113. SpotPricingError error
  114. RIPricingByInstanceID map[string]*RIData
  115. RIPricingError error
  116. RIDataRunning bool
  117. RIDataLock sync.RWMutex
  118. SavingsPlanDataByInstanceID map[string]*SavingsPlanData
  119. SavingsPlanDataRunning bool
  120. SavingsPlanDataLock sync.RWMutex
  121. ValidPricingKeys map[string]bool
  122. Clientset clustercache.ClusterCache
  123. BaseCPUPrice string
  124. BaseRAMPrice string
  125. BaseGPUPrice string
  126. BaseSpotCPUPrice string
  127. BaseSpotRAMPrice string
  128. BaseSpotGPUPrice string
  129. SpotLabelName string
  130. SpotLabelValue string
  131. SpotDataRegion string
  132. SpotDataBucket string
  133. SpotDataPrefix string
  134. ProjectID string
  135. DownloadPricingDataLock sync.RWMutex
  136. Config *ProviderConfig
  137. ServiceAccountChecks map[string]*ServiceAccountCheck
  138. clusterManagementPrice float64
  139. clusterAccountId string
  140. clusterRegion string
  141. clusterProvisioner string
  142. *CustomProvider
  143. }
  144. type AWSAccessKey struct {
  145. AccessKeyID string `json:"aws_access_key_id"`
  146. SecretAccessKey string `json:"aws_secret_access_key"`
  147. }
  148. // Retrieve returns a set of awsV2 credentials using the AWSAccessKey's key and secret.
  149. // This fullfils the awsV2.CredentialsProvider interface contract.
  150. func (accessKey AWSAccessKey) Retrieve(ctx context.Context) (awsV2.Credentials, error) {
  151. return awsV2.Credentials{
  152. AccessKeyID: accessKey.AccessKeyID,
  153. SecretAccessKey: accessKey.SecretAccessKey,
  154. }, nil
  155. }
  156. // AWSPricing maps a k8s node to an AWS Pricing "product"
  157. type AWSPricing struct {
  158. Products map[string]*AWSProduct `json:"products"`
  159. Terms AWSPricingTerms `json:"terms"`
  160. }
  161. // AWSProduct represents a purchased SKU
  162. type AWSProduct struct {
  163. Sku string `json:"sku"`
  164. Attributes AWSProductAttributes `json:"attributes"`
  165. }
  166. // AWSProductAttributes represents metadata about the product used to map to a node.
  167. type AWSProductAttributes struct {
  168. Location string `json:"location"`
  169. InstanceType string `json:"instanceType"`
  170. Memory string `json:"memory"`
  171. Storage string `json:"storage"`
  172. VCpu string `json:"vcpu"`
  173. UsageType string `json:"usagetype"`
  174. OperatingSystem string `json:"operatingSystem"`
  175. PreInstalledSw string `json:"preInstalledSw"`
  176. InstanceFamily string `json:"instanceFamily"`
  177. CapacityStatus string `json:"capacitystatus"`
  178. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  179. }
  180. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  181. type AWSPricingTerms struct {
  182. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  183. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  184. }
  185. // AWSOfferTerm is a sku extension used to pay for the node.
  186. type AWSOfferTerm struct {
  187. Sku string `json:"sku"`
  188. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  189. }
  190. func (ot *AWSOfferTerm) String() string {
  191. var strs []string
  192. for k, rc := range ot.PriceDimensions {
  193. strs = append(strs, fmt.Sprintf("%s:%s", k, rc.String()))
  194. }
  195. return fmt.Sprintf("%s:%s", ot.Sku, strings.Join(strs, ","))
  196. }
  197. // AWSRateCode encodes data about the price of a product
  198. type AWSRateCode struct {
  199. Unit string `json:"unit"`
  200. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  201. }
  202. func (rc *AWSRateCode) String() string {
  203. return fmt.Sprintf("{unit: %s, pricePerUnit: %v", rc.Unit, rc.PricePerUnit)
  204. }
  205. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  206. type AWSCurrencyCode struct {
  207. USD string `json:"USD,omitempty"`
  208. CNY string `json:"CNY,omitempty"`
  209. }
  210. // AWSProductTerms represents the full terms of the product
  211. type AWSProductTerms struct {
  212. Sku string `json:"sku"`
  213. OnDemand *AWSOfferTerm `json:"OnDemand"`
  214. Reserved *AWSOfferTerm `json:"Reserved"`
  215. Memory string `json:"memory"`
  216. Storage string `json:"storage"`
  217. VCpu string `json:"vcpu"`
  218. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  219. PV *PV `json:"pv"`
  220. }
  221. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  222. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  223. // OnDemandRateCode is appended to an node sku
  224. const OnDemandRateCode = ".JRTCKXETXF"
  225. const OnDemandRateCodeCn = ".99YE2YK9UR"
  226. // ReservedRateCode is appended to a node sku
  227. const ReservedRateCode = ".38NPMPTW36"
  228. // HourlyRateCode is appended to a node sku
  229. const HourlyRateCode = ".6YS6EN2CT7"
  230. const HourlyRateCodeCn = ".Q7UJUT2CE6"
  231. // volTypes are used to map between AWS UsageTypes and
  232. // EBS volume types, as they would appear in K8s storage class
  233. // name and the EC2 API.
  234. var volTypes = map[string]string{
  235. "EBS:VolumeUsage.gp2": "gp2",
  236. "EBS:VolumeUsage": "standard",
  237. "EBS:VolumeUsage.sc1": "sc1",
  238. "EBS:VolumeP-IOPS.piops": "io1",
  239. "EBS:VolumeUsage.st1": "st1",
  240. "EBS:VolumeUsage.piops": "io1",
  241. "gp2": "EBS:VolumeUsage.gp2",
  242. "standard": "EBS:VolumeUsage",
  243. "sc1": "EBS:VolumeUsage.sc1",
  244. "io1": "EBS:VolumeUsage.piops",
  245. "st1": "EBS:VolumeUsage.st1",
  246. }
  247. // locationToRegion maps AWS region names (As they come from Billing)
  248. // to actual region identifiers
  249. var locationToRegion = map[string]string{
  250. "US East (Ohio)": "us-east-2",
  251. "US East (N. Virginia)": "us-east-1",
  252. "US West (N. California)": "us-west-1",
  253. "US West (Oregon)": "us-west-2",
  254. "Asia Pacific (Hong Kong)": "ap-east-1",
  255. "Asia Pacific (Mumbai)": "ap-south-1",
  256. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  257. "Asia Pacific (Seoul)": "ap-northeast-2",
  258. "Asia Pacific (Singapore)": "ap-southeast-1",
  259. "Asia Pacific (Sydney)": "ap-southeast-2",
  260. "Asia Pacific (Tokyo)": "ap-northeast-1",
  261. "Canada (Central)": "ca-central-1",
  262. "China (Beijing)": "cn-north-1",
  263. "China (Ningxia)": "cn-northwest-1",
  264. "EU (Frankfurt)": "eu-central-1",
  265. "EU (Ireland)": "eu-west-1",
  266. "EU (London)": "eu-west-2",
  267. "EU (Paris)": "eu-west-3",
  268. "EU (Stockholm)": "eu-north-1",
  269. "South America (Sao Paulo)": "sa-east-1",
  270. "AWS GovCloud (US-East)": "us-gov-east-1",
  271. "AWS GovCloud (US-West)": "us-gov-west-1",
  272. }
  273. var regionToBillingRegionCode = map[string]string{
  274. "us-east-2": "USE2",
  275. "us-east-1": "",
  276. "us-west-1": "USW1",
  277. "us-west-2": "USW2",
  278. "ap-east-1": "APE1",
  279. "ap-south-1": "APS3",
  280. "ap-northeast-3": "APN3",
  281. "ap-northeast-2": "APN2",
  282. "ap-southeast-1": "APS1",
  283. "ap-southeast-2": "APS2",
  284. "ap-northeast-1": "APN1",
  285. "ca-central-1": "CAN1",
  286. "cn-north-1": "",
  287. "cn-northwest-1": "",
  288. "eu-central-1": "EUC1",
  289. "eu-west-1": "EU",
  290. "eu-west-2": "EUW2",
  291. "eu-west-3": "EUW3",
  292. "eu-north-1": "EUN1",
  293. "sa-east-1": "SAE1",
  294. "us-gov-east-1": "UGE1",
  295. "us-gov-west-1": "UGW1",
  296. }
  297. var loadedAWSSecret bool = false
  298. var awsSecret *AWSAccessKey = nil
  299. func (aws *AWS) GetLocalStorageQuery(window, offset time.Duration, rate bool, used bool) string {
  300. return ""
  301. }
  302. // KubeAttrConversion maps the k8s labels for region to an aws region
  303. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  304. operatingSystem = strings.ToLower(operatingSystem)
  305. region := locationToRegion[location]
  306. return region + "," + instanceType + "," + operatingSystem
  307. }
  308. type AwsSpotFeedInfo struct {
  309. BucketName string `json:"bucketName"`
  310. Prefix string `json:"prefix"`
  311. Region string `json:"region"`
  312. AccountID string `json:"projectID"`
  313. ServiceKeyName string `json:"serviceKeyName"`
  314. ServiceKeySecret string `json:"serviceKeySecret"`
  315. SpotLabel string `json:"spotLabel"`
  316. SpotLabelValue string `json:"spotLabelValue"`
  317. }
  318. type AwsAthenaInfo struct {
  319. AthenaBucketName string `json:"athenaBucketName"`
  320. AthenaRegion string `json:"athenaRegion"`
  321. AthenaDatabase string `json:"athenaDatabase"`
  322. AthenaTable string `json:"athenaTable"`
  323. ServiceKeyName string `json:"serviceKeyName"`
  324. ServiceKeySecret string `json:"serviceKeySecret"`
  325. AccountID string `json:"projectID"`
  326. MasterPayerARN string `json:"masterPayerARN"`
  327. }
  328. func (aws *AWS) GetManagementPlatform() (string, error) {
  329. nodes := aws.Clientset.GetAllNodes()
  330. if len(nodes) > 0 {
  331. n := nodes[0]
  332. version := n.Status.NodeInfo.KubeletVersion
  333. if strings.Contains(version, "eks") {
  334. return "eks", nil
  335. }
  336. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  337. return "kops", nil
  338. }
  339. }
  340. return "", nil
  341. }
  342. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  343. c, err := aws.Config.GetCustomPricingData()
  344. if err != nil {
  345. return nil, err
  346. }
  347. if c.Discount == "" {
  348. c.Discount = "0%"
  349. }
  350. if c.NegotiatedDiscount == "" {
  351. c.NegotiatedDiscount = "0%"
  352. }
  353. if c.ShareTenancyCosts == "" {
  354. c.ShareTenancyCosts = defaultShareTenancyCost
  355. }
  356. return c, nil
  357. }
  358. func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
  359. return aws.Config.UpdateFromMap(a)
  360. }
  361. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  362. return aws.Config.Update(func(c *CustomPricing) error {
  363. if updateType == SpotInfoUpdateType {
  364. a := AwsSpotFeedInfo{}
  365. err := json.NewDecoder(r).Decode(&a)
  366. if err != nil {
  367. return err
  368. }
  369. c.ServiceKeyName = a.ServiceKeyName
  370. if a.ServiceKeySecret != "" {
  371. c.ServiceKeySecret = a.ServiceKeySecret
  372. }
  373. c.SpotDataPrefix = a.Prefix
  374. c.SpotDataBucket = a.BucketName
  375. c.ProjectID = a.AccountID
  376. c.SpotDataRegion = a.Region
  377. c.SpotLabel = a.SpotLabel
  378. c.SpotLabelValue = a.SpotLabelValue
  379. } else if updateType == AthenaInfoUpdateType {
  380. a := AwsAthenaInfo{}
  381. err := json.NewDecoder(r).Decode(&a)
  382. if err != nil {
  383. return err
  384. }
  385. c.AthenaBucketName = a.AthenaBucketName
  386. c.AthenaRegion = a.AthenaRegion
  387. c.AthenaDatabase = a.AthenaDatabase
  388. c.AthenaTable = a.AthenaTable
  389. c.ServiceKeyName = a.ServiceKeyName
  390. if a.ServiceKeySecret != "" {
  391. c.ServiceKeySecret = a.ServiceKeySecret
  392. }
  393. if a.MasterPayerARN != "" {
  394. c.MasterPayerARN = a.MasterPayerARN
  395. }
  396. c.AthenaProjectID = a.AccountID
  397. } else {
  398. a := make(map[string]interface{})
  399. err := json.NewDecoder(r).Decode(&a)
  400. if err != nil {
  401. return err
  402. }
  403. for k, v := range a {
  404. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  405. vstr, ok := v.(string)
  406. if ok {
  407. err := SetCustomPricingField(c, kUpper, vstr)
  408. if err != nil {
  409. return err
  410. }
  411. } else {
  412. return fmt.Errorf("type error while updating config for %s", kUpper)
  413. }
  414. }
  415. }
  416. if env.IsRemoteEnabled() {
  417. err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
  418. if err != nil {
  419. return err
  420. }
  421. }
  422. return nil
  423. })
  424. }
  425. type awsKey struct {
  426. SpotLabelName string
  427. SpotLabelValue string
  428. Labels map[string]string
  429. ProviderID string
  430. }
  431. func (k *awsKey) GPUType() string {
  432. return ""
  433. }
  434. func (k *awsKey) ID() string {
  435. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  436. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  437. if matchNum == 2 {
  438. return group
  439. }
  440. }
  441. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  442. return ""
  443. }
  444. func (k *awsKey) Features() string {
  445. instanceType, _ := util.GetInstanceType(k.Labels)
  446. operatingSystem, _ := util.GetOperatingSystem(k.Labels)
  447. region, _ := util.GetRegion(k.Labels)
  448. key := region + "," + instanceType + "," + operatingSystem
  449. usageType := PreemptibleType
  450. spotKey := key + "," + usageType
  451. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  452. return spotKey
  453. }
  454. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  455. return spotKey
  456. }
  457. return key
  458. }
  459. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  460. pricing, ok := aws.Pricing[pvk.Features()]
  461. if !ok {
  462. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  463. return &PV{}, nil
  464. }
  465. return pricing.PV, nil
  466. }
  467. type awsPVKey struct {
  468. Labels map[string]string
  469. StorageClassParameters map[string]string
  470. StorageClassName string
  471. Name string
  472. DefaultRegion string
  473. ProviderID string
  474. }
  475. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  476. providerID := ""
  477. if pv.Spec.AWSElasticBlockStore != nil {
  478. providerID = pv.Spec.AWSElasticBlockStore.VolumeID
  479. } else if pv.Spec.CSI != nil {
  480. providerID = pv.Spec.CSI.VolumeHandle
  481. }
  482. return &awsPVKey{
  483. Labels: pv.Labels,
  484. StorageClassName: pv.Spec.StorageClassName,
  485. StorageClassParameters: parameters,
  486. Name: pv.Name,
  487. DefaultRegion: defaultRegion,
  488. ProviderID: providerID,
  489. }
  490. }
  491. func (key *awsPVKey) ID() string {
  492. return key.ProviderID
  493. }
  494. func (key *awsPVKey) GetStorageClass() string {
  495. return key.StorageClassName
  496. }
  497. func (key *awsPVKey) Features() string {
  498. storageClass := key.StorageClassParameters["type"]
  499. if storageClass == "standard" {
  500. storageClass = "gp2"
  501. }
  502. // Storage class names are generally EBS volume types (gp2)
  503. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  504. // Converts between the 2
  505. region, ok := util.GetRegion(key.Labels)
  506. if !ok {
  507. region = key.DefaultRegion
  508. }
  509. class, ok := volTypes[storageClass]
  510. if !ok {
  511. klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  512. }
  513. return region + "," + class
  514. }
  515. // GetKey maps node labels to information needed to retrieve pricing data
  516. func (aws *AWS) GetKey(labels map[string]string, n *v1.Node) Key {
  517. return &awsKey{
  518. SpotLabelName: aws.SpotLabelName,
  519. SpotLabelValue: aws.SpotLabelValue,
  520. Labels: labels,
  521. ProviderID: labels["providerID"],
  522. }
  523. }
  524. func (aws *AWS) isPreemptible(key string) bool {
  525. s := strings.Split(key, ",")
  526. if len(s) == 4 && s[3] == PreemptibleType {
  527. return true
  528. }
  529. return false
  530. }
  531. func (aws *AWS) ClusterManagementPricing() (string, float64, error) {
  532. return aws.clusterProvisioner, aws.clusterManagementPrice, nil
  533. }
  534. // Use the pricing data from the current region. Fall back to using all region data if needed.
  535. func (aws *AWS) getRegionPricing(nodeList []*v1.Node) (*http.Response, string, error) {
  536. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/"
  537. region := ""
  538. multiregion := false
  539. for _, n := range nodeList {
  540. labels := n.GetLabels()
  541. currentNodeRegion := ""
  542. if r, ok := util.GetRegion(labels); ok {
  543. currentNodeRegion = r
  544. // Switch to Chinese endpoint for regions with the Chinese prefix
  545. if strings.HasPrefix(currentNodeRegion, "cn-") {
  546. pricingURL = "https://pricing.cn-north-1.amazonaws.com.cn/offers/v1.0/cn/AmazonEC2/current/"
  547. }
  548. } else {
  549. multiregion = true // We weren't able to detect the node's region, so pull all data.
  550. break
  551. }
  552. if region == "" { // We haven't set a region yet
  553. region = currentNodeRegion
  554. } else if region != "" && currentNodeRegion != region { // If two nodes have different regions here, we'll need to fetch all pricing data.
  555. multiregion = true
  556. break
  557. }
  558. }
  559. // Chinese multiregion endpoint only contains data for Chinese regions and Chinese regions are excluded from other endpoint
  560. if region != "" && !multiregion {
  561. pricingURL += region + "/"
  562. }
  563. pricingURL += "index.json"
  564. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  565. resp, err := http.Get(pricingURL)
  566. if err != nil {
  567. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  568. return nil, pricingURL, err
  569. }
  570. return resp, pricingURL, err
  571. }
  572. // DownloadPricingData fetches data from the AWS Pricing API
  573. func (aws *AWS) DownloadPricingData() error {
  574. aws.DownloadPricingDataLock.Lock()
  575. defer aws.DownloadPricingDataLock.Unlock()
  576. if aws.ServiceAccountChecks == nil {
  577. aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  578. }
  579. c, err := aws.Config.GetCustomPricingData()
  580. if err != nil {
  581. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  582. }
  583. aws.BaseCPUPrice = c.CPU
  584. aws.BaseRAMPrice = c.RAM
  585. aws.BaseGPUPrice = c.GPU
  586. aws.BaseSpotCPUPrice = c.SpotCPU
  587. aws.BaseSpotRAMPrice = c.SpotRAM
  588. aws.BaseSpotGPUPrice = c.SpotGPU
  589. aws.SpotLabelName = c.SpotLabel
  590. aws.SpotLabelValue = c.SpotLabelValue
  591. aws.SpotDataBucket = c.SpotDataBucket
  592. aws.SpotDataPrefix = c.SpotDataPrefix
  593. aws.ProjectID = c.ProjectID
  594. aws.SpotDataRegion = c.SpotDataRegion
  595. aws.ConfigureAuthWith(c) // load aws authentication from configuration or secret
  596. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  597. klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  598. }
  599. nodeList := aws.Clientset.GetAllNodes()
  600. inputkeys := make(map[string]bool)
  601. for _, n := range nodeList {
  602. if _, ok := n.Labels["eks.amazonaws.com/nodegroup"]; ok {
  603. aws.clusterManagementPrice = 0.10
  604. aws.clusterProvisioner = "EKS"
  605. } else if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  606. aws.clusterProvisioner = "KOPS"
  607. }
  608. labels := n.GetObjectMeta().GetLabels()
  609. key := aws.GetKey(labels, n)
  610. inputkeys[key.Features()] = true
  611. }
  612. pvList := aws.Clientset.GetAllPersistentVolumes()
  613. storageClasses := aws.Clientset.GetAllStorageClasses()
  614. storageClassMap := make(map[string]map[string]string)
  615. for _, storageClass := range storageClasses {
  616. params := storageClass.Parameters
  617. storageClassMap[storageClass.ObjectMeta.Name] = params
  618. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  619. storageClassMap["default"] = params
  620. storageClassMap[""] = params
  621. }
  622. }
  623. pvkeys := make(map[string]PVKey)
  624. for _, pv := range pvList {
  625. params, ok := storageClassMap[pv.Spec.StorageClassName]
  626. if !ok {
  627. klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  628. continue
  629. }
  630. key := aws.GetPVKey(pv, params, "")
  631. pvkeys[key.Features()] = key
  632. }
  633. // RIDataRunning establishes the existance of the goroutine. Since it's possible we
  634. // run multiple downloads, we don't want to create multiple go routines if one already exists
  635. if !aws.RIDataRunning && c.AthenaBucketName != "" {
  636. err = aws.GetReservationDataFromAthena() // Block until one run has completed.
  637. if err != nil {
  638. klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
  639. } else { // If we make one successful run, check on new reservation data every hour
  640. go func() {
  641. defer errors.HandlePanic()
  642. aws.RIDataRunning = true
  643. for {
  644. klog.Infof("Reserved Instance watcher running... next update in 1h")
  645. time.Sleep(time.Hour)
  646. err := aws.GetReservationDataFromAthena()
  647. if err != nil {
  648. klog.Infof("Error updating RI data: %s", err.Error())
  649. }
  650. }
  651. }()
  652. }
  653. }
  654. if !aws.SavingsPlanDataRunning && c.AthenaBucketName != "" {
  655. err = aws.GetSavingsPlanDataFromAthena()
  656. if err != nil {
  657. klog.V(1).Infof("Failed to lookup savings plan data: %s", err.Error())
  658. } else {
  659. go func() {
  660. defer errors.HandlePanic()
  661. aws.SavingsPlanDataRunning = true
  662. for {
  663. klog.Infof("Savings Plan watcher running... next update in 1h")
  664. time.Sleep(time.Hour)
  665. err := aws.GetSavingsPlanDataFromAthena()
  666. if err != nil {
  667. klog.Infof("Error updating Savings Plan data: %s", err.Error())
  668. }
  669. }
  670. }()
  671. }
  672. }
  673. aws.Pricing = make(map[string]*AWSProductTerms)
  674. aws.ValidPricingKeys = make(map[string]bool)
  675. skusToKeys := make(map[string]string)
  676. resp, pricingURL, err := aws.getRegionPricing(nodeList)
  677. if err != nil {
  678. return err
  679. }
  680. dec := json.NewDecoder(resp.Body)
  681. for {
  682. t, err := dec.Token()
  683. if err == io.EOF {
  684. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  685. break
  686. } else if err != nil {
  687. klog.V(2).Infof("error parsing response json %v", resp.Body)
  688. break
  689. }
  690. if t == "products" {
  691. _, err := dec.Token() // this should parse the opening "{""
  692. if err != nil {
  693. return err
  694. }
  695. for dec.More() {
  696. _, err := dec.Token() // the sku token
  697. if err != nil {
  698. return err
  699. }
  700. product := &AWSProduct{}
  701. err = dec.Decode(&product)
  702. if err != nil {
  703. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  704. break
  705. }
  706. if product.Attributes.PreInstalledSw == "NA" &&
  707. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) &&
  708. product.Attributes.CapacityStatus == "Used" {
  709. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  710. spotKey := key + ",preemptible"
  711. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  712. productTerms := &AWSProductTerms{
  713. Sku: product.Sku,
  714. Memory: product.Attributes.Memory,
  715. Storage: product.Attributes.Storage,
  716. VCpu: product.Attributes.VCpu,
  717. GPU: product.Attributes.GPU,
  718. }
  719. aws.Pricing[key] = productTerms
  720. aws.Pricing[spotKey] = productTerms
  721. skusToKeys[product.Sku] = key
  722. }
  723. aws.ValidPricingKeys[key] = true
  724. aws.ValidPricingKeys[spotKey] = true
  725. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  726. // UsageTypes may be prefixed with a region code - we're removing this when using
  727. // volTypes to keep lookups generic
  728. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  729. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  730. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  731. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  732. spotKey := key + ",preemptible"
  733. pv := &PV{
  734. Class: volTypes[usageTypeNoRegion],
  735. Region: locationToRegion[product.Attributes.Location],
  736. }
  737. productTerms := &AWSProductTerms{
  738. Sku: product.Sku,
  739. PV: pv,
  740. }
  741. aws.Pricing[key] = productTerms
  742. aws.Pricing[spotKey] = productTerms
  743. skusToKeys[product.Sku] = key
  744. aws.ValidPricingKeys[key] = true
  745. aws.ValidPricingKeys[spotKey] = true
  746. }
  747. }
  748. }
  749. if t == "terms" {
  750. _, err := dec.Token() // this should parse the opening "{""
  751. if err != nil {
  752. return err
  753. }
  754. termType, err := dec.Token()
  755. if err != nil {
  756. return err
  757. }
  758. if termType == "OnDemand" {
  759. _, err := dec.Token()
  760. if err != nil { // again, should parse an opening "{"
  761. return err
  762. }
  763. for dec.More() {
  764. sku, err := dec.Token()
  765. if err != nil {
  766. return err
  767. }
  768. _, err = dec.Token() // another opening "{"
  769. if err != nil {
  770. return err
  771. }
  772. skuOnDemand, err := dec.Token()
  773. if err != nil {
  774. return err
  775. }
  776. offerTerm := &AWSOfferTerm{}
  777. err = dec.Decode(&offerTerm)
  778. if err != nil {
  779. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  780. }
  781. key, ok := skusToKeys[sku.(string)]
  782. spotKey := key + ",preemptible"
  783. if ok {
  784. aws.Pricing[key].OnDemand = offerTerm
  785. aws.Pricing[spotKey].OnDemand = offerTerm
  786. var cost string
  787. if sku.(string)+OnDemandRateCode == skuOnDemand {
  788. cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  789. } else if sku.(string)+OnDemandRateCodeCn == skuOnDemand {
  790. cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCodeCn+HourlyRateCodeCn].PricePerUnit.CNY
  791. }
  792. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  793. // If the specific UsageType is the per IO cost used on io1 volumes
  794. // we need to add the per IO cost to the io1 PV cost
  795. // Add the per IO cost to the PV object for the io1 volume type
  796. aws.Pricing[key].PV.CostPerIO = cost
  797. } else if strings.Contains(key, "EBS:Volume") {
  798. // If volume, we need to get hourly cost and add it to the PV object
  799. costFloat, _ := strconv.ParseFloat(cost, 64)
  800. hourlyPrice := costFloat / 730
  801. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  802. }
  803. }
  804. _, err = dec.Token()
  805. if err != nil {
  806. return err
  807. }
  808. }
  809. _, err = dec.Token()
  810. if err != nil {
  811. return err
  812. }
  813. }
  814. }
  815. }
  816. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  817. // Always run spot pricing refresh when performing download
  818. aws.refreshSpotPricing(true)
  819. // Only start a single refresh goroutine
  820. if !aws.SpotRefreshRunning {
  821. aws.SpotRefreshRunning = true
  822. go func() {
  823. defer errors.HandlePanic()
  824. for {
  825. klog.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
  826. time.Sleep(SpotRefreshDuration)
  827. // Reoccurring refresh checks update times
  828. aws.refreshSpotPricing(false)
  829. }
  830. }()
  831. }
  832. return nil
  833. }
  834. func (aws *AWS) refreshSpotPricing(force bool) {
  835. aws.SpotPricingLock.Lock()
  836. defer aws.SpotPricingLock.Unlock()
  837. now := time.Now().UTC()
  838. updateTime := now.Add(-SpotRefreshDuration)
  839. // Return if there was an update time set and an hour hasn't elapsed
  840. if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
  841. return
  842. }
  843. sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion)
  844. if err != nil {
  845. klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
  846. aws.SpotPricingError = err
  847. return
  848. }
  849. aws.SpotPricingError = nil
  850. // update time last updated
  851. aws.SpotPricingUpdatedAt = &now
  852. aws.SpotPricingByInstanceID = sp
  853. }
  854. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  855. func (aws *AWS) NetworkPricing() (*Network, error) {
  856. cpricing, err := aws.Config.GetCustomPricingData()
  857. if err != nil {
  858. return nil, err
  859. }
  860. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  861. if err != nil {
  862. return nil, err
  863. }
  864. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  865. if err != nil {
  866. return nil, err
  867. }
  868. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  869. if err != nil {
  870. return nil, err
  871. }
  872. return &Network{
  873. ZoneNetworkEgressCost: znec,
  874. RegionNetworkEgressCost: rnec,
  875. InternetNetworkEgressCost: inec,
  876. }, nil
  877. }
  878. func (aws *AWS) LoadBalancerPricing() (*LoadBalancer, error) {
  879. fffrc := 0.025
  880. afrc := 0.010
  881. lbidc := 0.008
  882. numForwardingRules := 1.0
  883. dataIngressGB := 0.0
  884. var totalCost float64
  885. if numForwardingRules < 5 {
  886. totalCost = fffrc*numForwardingRules + lbidc*dataIngressGB
  887. } else {
  888. totalCost = fffrc*5 + afrc*(numForwardingRules-5) + lbidc*dataIngressGB
  889. }
  890. return &LoadBalancer{
  891. Cost: totalCost,
  892. }, nil
  893. }
  894. // AllNodePricing returns all the billing data fetched.
  895. func (aws *AWS) AllNodePricing() (interface{}, error) {
  896. aws.DownloadPricingDataLock.RLock()
  897. defer aws.DownloadPricingDataLock.RUnlock()
  898. return aws.Pricing, nil
  899. }
  900. func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
  901. aws.SpotPricingLock.RLock()
  902. defer aws.SpotPricingLock.RUnlock()
  903. info, ok := aws.SpotPricingByInstanceID[instanceID]
  904. return info, ok
  905. }
  906. func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
  907. aws.RIDataLock.RLock()
  908. defer aws.RIDataLock.RUnlock()
  909. data, ok := aws.RIPricingByInstanceID[instanceID]
  910. return data, ok
  911. }
  912. func (aws *AWS) savingsPlanPricing(instanceID string) (*SavingsPlanData, bool) {
  913. aws.SavingsPlanDataLock.RLock()
  914. defer aws.SavingsPlanDataLock.RUnlock()
  915. data, ok := aws.SavingsPlanDataByInstanceID[instanceID]
  916. return data, ok
  917. }
  918. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  919. key := k.Features()
  920. if spotInfo, ok := aws.spotPricing(k.ID()); ok {
  921. var spotcost string
  922. log.DedupedInfof(5, "Looking up spot data from feed for node %s", k.ID())
  923. arr := strings.Split(spotInfo.Charge, " ")
  924. if len(arr) == 2 {
  925. spotcost = arr[0]
  926. } else {
  927. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  928. }
  929. return &Node{
  930. Cost: spotcost,
  931. VCPU: terms.VCpu,
  932. RAM: terms.Memory,
  933. GPU: terms.GPU,
  934. Storage: terms.Storage,
  935. BaseCPUPrice: aws.BaseCPUPrice,
  936. BaseRAMPrice: aws.BaseRAMPrice,
  937. BaseGPUPrice: aws.BaseGPUPrice,
  938. UsageType: PreemptibleType,
  939. }, nil
  940. } else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
  941. log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
  942. return &Node{
  943. VCPU: terms.VCpu,
  944. VCPUCost: aws.BaseSpotCPUPrice,
  945. RAM: terms.Memory,
  946. GPU: terms.GPU,
  947. Storage: terms.Storage,
  948. BaseCPUPrice: aws.BaseCPUPrice,
  949. BaseRAMPrice: aws.BaseRAMPrice,
  950. BaseGPUPrice: aws.BaseGPUPrice,
  951. UsageType: PreemptibleType,
  952. }, nil
  953. } else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
  954. strCost := fmt.Sprintf("%f", sp.EffectiveCost)
  955. return &Node{
  956. Cost: strCost,
  957. VCPU: terms.VCpu,
  958. RAM: terms.Memory,
  959. GPU: terms.GPU,
  960. Storage: terms.Storage,
  961. BaseCPUPrice: aws.BaseCPUPrice,
  962. BaseRAMPrice: aws.BaseRAMPrice,
  963. BaseGPUPrice: aws.BaseGPUPrice,
  964. UsageType: usageType,
  965. }, nil
  966. } else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
  967. strCost := fmt.Sprintf("%f", ri.EffectiveCost)
  968. return &Node{
  969. Cost: strCost,
  970. VCPU: terms.VCpu,
  971. RAM: terms.Memory,
  972. GPU: terms.GPU,
  973. Storage: terms.Storage,
  974. BaseCPUPrice: aws.BaseCPUPrice,
  975. BaseRAMPrice: aws.BaseRAMPrice,
  976. BaseGPUPrice: aws.BaseGPUPrice,
  977. UsageType: usageType,
  978. }, nil
  979. }
  980. var cost string
  981. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  982. if ok {
  983. cost = c.PricePerUnit.USD
  984. } else {
  985. // Check for Chinese pricing before throwing error
  986. c, ok = terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCodeCn+HourlyRateCodeCn]
  987. if ok {
  988. cost = c.PricePerUnit.CNY
  989. } else {
  990. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  991. }
  992. }
  993. return &Node{
  994. Cost: cost,
  995. VCPU: terms.VCpu,
  996. RAM: terms.Memory,
  997. GPU: terms.GPU,
  998. Storage: terms.Storage,
  999. BaseCPUPrice: aws.BaseCPUPrice,
  1000. BaseRAMPrice: aws.BaseRAMPrice,
  1001. BaseGPUPrice: aws.BaseGPUPrice,
  1002. UsageType: usageType,
  1003. }, nil
  1004. }
  1005. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  1006. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  1007. aws.DownloadPricingDataLock.RLock()
  1008. defer aws.DownloadPricingDataLock.RUnlock()
  1009. key := k.Features()
  1010. usageType := "ondemand"
  1011. if aws.isPreemptible(key) {
  1012. usageType = PreemptibleType
  1013. }
  1014. terms, ok := aws.Pricing[key]
  1015. if ok {
  1016. return aws.createNode(terms, usageType, k)
  1017. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  1018. aws.DownloadPricingDataLock.RUnlock()
  1019. err := aws.DownloadPricingData()
  1020. aws.DownloadPricingDataLock.RLock()
  1021. if err != nil {
  1022. return &Node{
  1023. Cost: aws.BaseCPUPrice,
  1024. BaseCPUPrice: aws.BaseCPUPrice,
  1025. BaseRAMPrice: aws.BaseRAMPrice,
  1026. BaseGPUPrice: aws.BaseGPUPrice,
  1027. UsageType: usageType,
  1028. UsesBaseCPUPrice: true,
  1029. }, err
  1030. }
  1031. terms, termsOk := aws.Pricing[key]
  1032. if !termsOk {
  1033. return &Node{
  1034. Cost: aws.BaseCPUPrice,
  1035. BaseCPUPrice: aws.BaseCPUPrice,
  1036. BaseRAMPrice: aws.BaseRAMPrice,
  1037. BaseGPUPrice: aws.BaseGPUPrice,
  1038. UsageType: usageType,
  1039. UsesBaseCPUPrice: true,
  1040. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  1041. }
  1042. return aws.createNode(terms, usageType, k)
  1043. } else { // Fall back to base pricing if we can't find the key. Base pricing is handled at the costmodel level.
  1044. return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
  1045. }
  1046. }
  1047. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  1048. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  1049. defaultClusterName := "AWS Cluster #1"
  1050. c, err := awsProvider.GetConfig()
  1051. if err != nil {
  1052. return nil, err
  1053. }
  1054. remoteEnabled := env.IsRemoteEnabled()
  1055. if c.ClusterName != "" {
  1056. m := make(map[string]string)
  1057. m["name"] = c.ClusterName
  1058. m["provider"] = "AWS"
  1059. m["account"] = c.AthenaProjectID // this value requires configuration but is unavailable else where
  1060. m["region"] = awsProvider.clusterRegion
  1061. m["id"] = env.GetClusterID()
  1062. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  1063. m["provisioner"] = awsProvider.clusterProvisioner
  1064. return m, nil
  1065. }
  1066. makeStructure := func(clusterName string) (map[string]string, error) {
  1067. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  1068. m := make(map[string]string)
  1069. m["name"] = clusterName
  1070. m["provider"] = "AWS"
  1071. m["account"] = c.AthenaProjectID // this value requires configuration but is unavailable else where
  1072. m["region"] = awsProvider.clusterRegion
  1073. m["id"] = env.GetClusterID()
  1074. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  1075. return m, nil
  1076. }
  1077. maybeClusterId := env.GetAWSClusterID()
  1078. if len(maybeClusterId) != 0 {
  1079. return makeStructure(maybeClusterId)
  1080. }
  1081. // TODO: This should be cached, it can take a long time to hit the API
  1082. //provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  1083. //clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  1084. //klog.Infof("nodelist get here %s", time.Now())
  1085. //nodeList := awsProvider.Clientset.GetAllNodes()
  1086. //klog.Infof("nodelist done here %s", time.Now())
  1087. /*for _, n := range nodeList {
  1088. region := ""
  1089. instanceId := ""
  1090. providerId := n.Spec.ProviderID
  1091. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  1092. if matchNum == 1 {
  1093. region = group
  1094. } else if matchNum == 2 {
  1095. instanceId = group
  1096. }
  1097. }
  1098. if len(instanceId) == 0 {
  1099. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  1100. continue
  1101. }
  1102. c := &aws.Config{
  1103. Region: aws.String(region),
  1104. }
  1105. s := session.Must(session.NewSession(c))
  1106. ec2Svc := ec2.New(s)
  1107. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  1108. InstanceIds: []*string{
  1109. aws.String(instanceId),
  1110. },
  1111. })
  1112. if diErr != nil {
  1113. klog.Infof("Error describing instances: %s", diErr)
  1114. continue
  1115. }
  1116. if len(di.Reservations) != 1 {
  1117. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  1118. continue
  1119. }
  1120. res := di.Reservations[0]
  1121. if len(res.Instances) != 1 {
  1122. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  1123. continue
  1124. }
  1125. inst := res.Instances[0]
  1126. for _, tag := range inst.Tags {
  1127. tagKey := *tag.Key
  1128. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  1129. if matchNum != 1 {
  1130. continue
  1131. }
  1132. return makeStructure(group)
  1133. }
  1134. }
  1135. }*/
  1136. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", env.AWSClusterIDEnvVar)
  1137. return makeStructure(defaultClusterName)
  1138. }
  1139. // updates the authentication to the latest values (via config or secret)
  1140. func (aws *AWS) ConfigureAuth() error {
  1141. c, err := aws.Config.GetCustomPricingData()
  1142. if err != nil {
  1143. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  1144. }
  1145. return aws.ConfigureAuthWith(c)
  1146. }
  1147. // updates the authentication to the latest values (via config or secret)
  1148. func (aws *AWS) ConfigureAuthWith(config *CustomPricing) error {
  1149. accessKeyID, accessKeySecret := aws.getAWSAuth(false, config)
  1150. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1151. err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
  1152. if err != nil {
  1153. return err
  1154. }
  1155. err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
  1156. if err != nil {
  1157. return err
  1158. }
  1159. }
  1160. return nil
  1161. }
  1162. // Gets the aws key id and secret
  1163. func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
  1164. if aws.ServiceAccountChecks == nil { // safety in case checks don't exist
  1165. aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  1166. }
  1167. // 1. Check config values first (set from frontend UI)
  1168. if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
  1169. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1170. Message: "AWS ServiceKey exists",
  1171. Status: true,
  1172. }
  1173. return cp.ServiceKeyName, cp.ServiceKeySecret
  1174. }
  1175. // 2. Check for secret
  1176. s, _ := aws.loadAWSAuthSecret(forceReload)
  1177. if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
  1178. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1179. Message: "AWS ServiceKey exists",
  1180. Status: true,
  1181. }
  1182. return s.AccessKeyID, s.SecretAccessKey
  1183. }
  1184. // 3. Fall back to env vars
  1185. if env.GetAWSAccessKeyID() == "" || env.GetAWSAccessKeyID() == "" {
  1186. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1187. Message: "AWS ServiceKey exists",
  1188. Status: false,
  1189. }
  1190. } else {
  1191. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1192. Message: "AWS ServiceKey exists",
  1193. Status: true,
  1194. }
  1195. }
  1196. return env.GetAWSAccessKeyID(), env.GetAWSAccessKeySecret()
  1197. }
  1198. // Load once and cache the result (even on failure). This is an install time secret, so
  1199. // we don't expect the secret to change. If it does, however, we can force reload using
  1200. // the input parameter.
  1201. func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
  1202. if !force && loadedAWSSecret {
  1203. return awsSecret, nil
  1204. }
  1205. loadedAWSSecret = true
  1206. exists, err := fileutil.FileExists(authSecretPath)
  1207. if !exists || err != nil {
  1208. return nil, fmt.Errorf("Failed to locate service account file: %s", authSecretPath)
  1209. }
  1210. result, err := ioutil.ReadFile(authSecretPath)
  1211. if err != nil {
  1212. return nil, err
  1213. }
  1214. var ak AWSAccessKey
  1215. err = json.Unmarshal(result, &ak)
  1216. if err != nil {
  1217. return nil, err
  1218. }
  1219. awsSecret = &ak
  1220. return awsSecret, nil
  1221. }
  1222. func getClusterConfig(ccFile string) (map[string]string, error) {
  1223. clusterConfig, err := os.Open(ccFile)
  1224. if err != nil {
  1225. return nil, err
  1226. }
  1227. defer clusterConfig.Close()
  1228. b, err := ioutil.ReadAll(clusterConfig)
  1229. if err != nil {
  1230. return nil, err
  1231. }
  1232. var clusterConf map[string]string
  1233. err = json.Unmarshal([]byte(b), &clusterConf)
  1234. if err != nil {
  1235. return nil, err
  1236. }
  1237. return clusterConf, nil
  1238. }
  1239. func (a *AWS) getAddressesForRegion(region string) (*ec2.DescribeAddressesOutput, error) {
  1240. sess, err := session.NewSession(&aws.Config{
  1241. Region: aws.String(region),
  1242. Credentials: credentials.NewEnvCredentials(),
  1243. })
  1244. if err != nil {
  1245. return nil, err
  1246. }
  1247. ec2Svc := ec2.New(sess)
  1248. return ec2Svc.DescribeAddresses(&ec2.DescribeAddressesInput{})
  1249. }
  1250. func (a *AWS) GetAddresses() ([]byte, error) {
  1251. a.ConfigureAuth() // load authentication data into env vars
  1252. addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
  1253. errorCh := make(chan error, len(awsRegions))
  1254. var wg sync.WaitGroup
  1255. wg.Add(len(awsRegions))
  1256. // Get volumes from each AWS region
  1257. for _, r := range awsRegions {
  1258. // Fetch IP address response and send results and errors to their
  1259. // respective channels
  1260. go func(region string) {
  1261. defer wg.Done()
  1262. defer errors.HandlePanic()
  1263. // Query for first page of volume results
  1264. resp, err := a.getAddressesForRegion(region)
  1265. if err != nil {
  1266. if aerr, ok := err.(awserr.Error); ok {
  1267. switch aerr.Code() {
  1268. default:
  1269. errorCh <- aerr
  1270. }
  1271. return
  1272. } else {
  1273. errorCh <- err
  1274. return
  1275. }
  1276. }
  1277. addressCh <- resp
  1278. }(r)
  1279. }
  1280. // Close the result channels after everything has been sent
  1281. go func() {
  1282. defer errors.HandlePanic()
  1283. wg.Wait()
  1284. close(errorCh)
  1285. close(addressCh)
  1286. }()
  1287. addresses := []*ec2.Address{}
  1288. for adds := range addressCh {
  1289. addresses = append(addresses, adds.Addresses...)
  1290. }
  1291. errors := []error{}
  1292. for err := range errorCh {
  1293. log.DedupedWarningf(5, "unable to get addresses: %s", err)
  1294. errors = append(errors, err)
  1295. }
  1296. // Return error if no addresses are returned
  1297. if len(errors) > 0 && len(addresses) == 0 {
  1298. return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errors), errors)
  1299. }
  1300. // Format the response this way to match the JSON-encoded formatting of a single response
  1301. // from DescribeAddresss, so that consumers can always expect AWS disk responses to have
  1302. // a "Addresss" key at the top level.
  1303. return json.Marshal(map[string][]*ec2.Address{
  1304. "Addresses": addresses,
  1305. })
  1306. }
  1307. func (a *AWS) getDisksForRegion(region string, maxResults int64, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
  1308. sess, err := session.NewSession(&aws.Config{
  1309. Region: aws.String(region),
  1310. Credentials: credentials.NewEnvCredentials(),
  1311. })
  1312. if err != nil {
  1313. return nil, err
  1314. }
  1315. ec2Svc := ec2.New(sess)
  1316. return ec2Svc.DescribeVolumes(&ec2.DescribeVolumesInput{
  1317. MaxResults: &maxResults,
  1318. NextToken: nextToken,
  1319. })
  1320. }
  1321. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  1322. func (a *AWS) GetDisks() ([]byte, error) {
  1323. a.ConfigureAuth() // load authentication data into env vars
  1324. volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
  1325. errorCh := make(chan error, len(awsRegions))
  1326. var wg sync.WaitGroup
  1327. wg.Add(len(awsRegions))
  1328. // Get volumes from each AWS region
  1329. for _, r := range awsRegions {
  1330. // Fetch volume response and send results and errors to their
  1331. // respective channels
  1332. go func(region string) {
  1333. defer wg.Done()
  1334. defer errors.HandlePanic()
  1335. // Query for first page of volume results
  1336. resp, err := a.getDisksForRegion(region, 1000, nil)
  1337. if err != nil {
  1338. if aerr, ok := err.(awserr.Error); ok {
  1339. switch aerr.Code() {
  1340. default:
  1341. errorCh <- aerr
  1342. }
  1343. return
  1344. } else {
  1345. errorCh <- err
  1346. return
  1347. }
  1348. }
  1349. volumeCh <- resp
  1350. // A NextToken indicates more pages of results. Keep querying
  1351. // until all pages are retrieved.
  1352. for resp.NextToken != nil {
  1353. resp, err = a.getDisksForRegion(region, 100, resp.NextToken)
  1354. if err != nil {
  1355. if aerr, ok := err.(awserr.Error); ok {
  1356. switch aerr.Code() {
  1357. default:
  1358. errorCh <- aerr
  1359. }
  1360. return
  1361. } else {
  1362. errorCh <- err
  1363. return
  1364. }
  1365. }
  1366. volumeCh <- resp
  1367. }
  1368. }(r)
  1369. }
  1370. // Close the result channels after everything has been sent
  1371. go func() {
  1372. defer errors.HandlePanic()
  1373. wg.Wait()
  1374. close(errorCh)
  1375. close(volumeCh)
  1376. }()
  1377. volumes := []*ec2.Volume{}
  1378. for vols := range volumeCh {
  1379. volumes = append(volumes, vols.Volumes...)
  1380. }
  1381. errors := []error{}
  1382. for err := range errorCh {
  1383. log.DedupedWarningf(5, "unable to get disks: %s", err)
  1384. errors = append(errors, err)
  1385. }
  1386. // Return error if no volumes are returned
  1387. if len(errors) > 0 && len(volumes) == 0 {
  1388. return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errors), errors)
  1389. }
  1390. // Format the response this way to match the JSON-encoded formatting of a single response
  1391. // from DescribeVolumes, so that consumers can always expect AWS disk responses to have
  1392. // a "Volumes" key at the top level.
  1393. return json.Marshal(map[string][]*ec2.Volume{
  1394. "Volumes": volumes,
  1395. })
  1396. }
  1397. func generateAWSGroupBy(lastIdx int) string {
  1398. sequence := []string{}
  1399. for i := 1; i < lastIdx+1; i++ {
  1400. sequence = append(sequence, strconv.Itoa(i))
  1401. }
  1402. return strings.Join(sequence, ",")
  1403. }
  1404. func (a *AWS) QueryAthenaPaginated(query string, fn func(*athena.GetQueryResultsOutput, bool) bool) error {
  1405. customPricing, err := a.GetConfig()
  1406. if err != nil {
  1407. return err
  1408. }
  1409. a.ConfigureAuthWith(customPricing)
  1410. region := aws.String(customPricing.AthenaRegion)
  1411. resultsBucket := customPricing.AthenaBucketName
  1412. database := customPricing.AthenaDatabase
  1413. c := &aws.Config{
  1414. Region: region,
  1415. STSRegionalEndpoint: endpoints.RegionalSTSEndpoint,
  1416. }
  1417. s := session.Must(session.NewSession(c))
  1418. svc := athena.New(s)
  1419. if customPricing.MasterPayerARN != "" {
  1420. creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
  1421. svc = athena.New(s, &aws.Config{
  1422. Region: region,
  1423. Credentials: creds,
  1424. })
  1425. }
  1426. var e athena.StartQueryExecutionInput
  1427. var r athena.ResultConfiguration
  1428. r.SetOutputLocation(resultsBucket)
  1429. e.SetResultConfiguration(&r)
  1430. e.SetQueryString(query)
  1431. var q athena.QueryExecutionContext
  1432. q.SetDatabase(database)
  1433. e.SetQueryExecutionContext(&q)
  1434. res, err := svc.StartQueryExecution(&e)
  1435. if err != nil {
  1436. return err
  1437. }
  1438. klog.V(2).Infof("StartQueryExecution result:")
  1439. klog.V(2).Infof(res.GoString())
  1440. var qri athena.GetQueryExecutionInput
  1441. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1442. var qrop *athena.GetQueryExecutionOutput
  1443. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1444. for {
  1445. qrop, err = svc.GetQueryExecution(&qri)
  1446. if err != nil {
  1447. return err
  1448. }
  1449. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1450. break
  1451. }
  1452. time.Sleep(duration)
  1453. }
  1454. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1455. var ip athena.GetQueryResultsInput
  1456. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1457. err = svc.GetQueryResultsPages(&ip, fn)
  1458. if err != nil {
  1459. return fmt.Errorf("queryAthenaPaginated: error getting query resultsPages from athena service %s", err)
  1460. }
  1461. return nil
  1462. } else {
  1463. return fmt.Errorf("No results available for %s", query)
  1464. }
  1465. }
  1466. type SavingsPlanData struct {
  1467. ResourceID string
  1468. EffectiveCost float64
  1469. SavingsPlanARN string
  1470. MostRecentDate string
  1471. }
  1472. func (a *AWS) GetSavingsPlanDataFromAthena() error {
  1473. cfg, err := a.GetConfig()
  1474. if err != nil {
  1475. return err
  1476. }
  1477. if cfg.AthenaBucketName == "" {
  1478. return fmt.Errorf("No Athena Bucket configured")
  1479. }
  1480. if a.SavingsPlanDataByInstanceID == nil {
  1481. a.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData)
  1482. }
  1483. tNow := time.Now()
  1484. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1485. start := tOneDayAgo.Format("2006-01-02")
  1486. end := tNow.Format("2006-01-02")
  1487. // Use Savings Plan Effective Rate as an estimation for cost, assuming the 1h most recent period got a fully loaded savings plan.
  1488. //
  1489. q := `SELECT
  1490. line_item_usage_start_date,
  1491. savings_plan_savings_plan_a_r_n,
  1492. line_item_resource_id,
  1493. savings_plan_savings_plan_rate
  1494. FROM %s as cost_data
  1495. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1496. AND line_item_line_item_type = 'SavingsPlanCoveredUsage' ORDER BY
  1497. line_item_usage_start_date DESC`
  1498. page := 0
  1499. processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
  1500. a.SavingsPlanDataLock.Lock()
  1501. a.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData) // Clean out the old data and only report a savingsplan price if its in the most recent run.
  1502. mostRecentDate := ""
  1503. iter := op.ResultSet.Rows
  1504. if page == 0 && len(iter) > 0 {
  1505. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1506. }
  1507. page++
  1508. for _, r := range iter {
  1509. d := *r.Data[0].VarCharValue
  1510. if mostRecentDate == "" {
  1511. mostRecentDate = d
  1512. } else if mostRecentDate != d { // Get all most recent assignments
  1513. break
  1514. }
  1515. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1516. if err != nil {
  1517. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1518. }
  1519. r := &SavingsPlanData{
  1520. ResourceID: *r.Data[2].VarCharValue,
  1521. EffectiveCost: cost,
  1522. SavingsPlanARN: *r.Data[1].VarCharValue,
  1523. MostRecentDate: d,
  1524. }
  1525. a.SavingsPlanDataByInstanceID[r.ResourceID] = r
  1526. }
  1527. klog.V(1).Infof("Found %d savings plan applied instances", len(a.SavingsPlanDataByInstanceID))
  1528. for k, r := range a.SavingsPlanDataByInstanceID {
  1529. log.DedupedInfof(5, "Savings Plan Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1530. }
  1531. a.SavingsPlanDataLock.Unlock()
  1532. return true
  1533. }
  1534. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1535. klog.V(3).Infof("Running Query: %s", query)
  1536. err = a.QueryAthenaPaginated(query, processResults)
  1537. if err != nil {
  1538. return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
  1539. }
  1540. return nil
  1541. }
  1542. type RIData struct {
  1543. ResourceID string
  1544. EffectiveCost float64
  1545. ReservationARN string
  1546. MostRecentDate string
  1547. }
  1548. func (a *AWS) GetReservationDataFromAthena() error {
  1549. cfg, err := a.GetConfig()
  1550. if err != nil {
  1551. return err
  1552. }
  1553. if cfg.AthenaBucketName == "" {
  1554. return fmt.Errorf("No Athena Bucket configured")
  1555. }
  1556. // Query for all column names in advance in order to validate configured
  1557. // label columns
  1558. columns, _ := a.ShowAthenaColumns()
  1559. if !columns["reservation_reservation_a_r_n"] || !columns["reservation_effective_cost"] {
  1560. klog.Infof("No reserved data available in Athena")
  1561. a.RIPricingError = nil
  1562. }
  1563. if a.RIPricingByInstanceID == nil {
  1564. a.RIPricingByInstanceID = make(map[string]*RIData)
  1565. }
  1566. tNow := time.Now()
  1567. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1568. start := tOneDayAgo.Format("2006-01-02")
  1569. end := tNow.Format("2006-01-02")
  1570. q := `SELECT
  1571. line_item_usage_start_date,
  1572. reservation_reservation_a_r_n,
  1573. line_item_resource_id,
  1574. reservation_effective_cost
  1575. FROM %s as cost_data
  1576. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1577. AND reservation_reservation_a_r_n <> '' ORDER BY
  1578. line_item_usage_start_date DESC`
  1579. page := 0
  1580. processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
  1581. a.RIDataLock.Lock()
  1582. a.RIPricingByInstanceID = make(map[string]*RIData) // Clean out the old data and only report a RI price if its in the most recent run.
  1583. mostRecentDate := ""
  1584. iter := op.ResultSet.Rows
  1585. if page == 0 && len(iter) > 0 {
  1586. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1587. }
  1588. page++
  1589. for _, r := range iter {
  1590. d := *r.Data[0].VarCharValue
  1591. if mostRecentDate == "" {
  1592. mostRecentDate = d
  1593. } else if mostRecentDate != d { // Get all most recent assignments
  1594. break
  1595. }
  1596. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1597. if err != nil {
  1598. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1599. }
  1600. r := &RIData{
  1601. ResourceID: *r.Data[2].VarCharValue,
  1602. EffectiveCost: cost,
  1603. ReservationARN: *r.Data[1].VarCharValue,
  1604. MostRecentDate: d,
  1605. }
  1606. a.RIPricingByInstanceID[r.ResourceID] = r
  1607. }
  1608. klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
  1609. for k, r := range a.RIPricingByInstanceID {
  1610. log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1611. }
  1612. a.RIDataLock.Unlock()
  1613. return true
  1614. }
  1615. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1616. klog.V(3).Infof("Running Query: %s", query)
  1617. err = a.QueryAthenaPaginated(query, processResults)
  1618. if err != nil {
  1619. a.RIPricingError = err
  1620. return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
  1621. }
  1622. a.RIPricingError = nil
  1623. return nil
  1624. }
  1625. // ShowAthenaColumns returns a list of the names of all columns in the configured
  1626. // Athena tables
  1627. func (aws *AWS) ShowAthenaColumns() (map[string]bool, error) {
  1628. columnSet := map[string]bool{}
  1629. // Configure Athena query
  1630. cfg, err := aws.GetConfig()
  1631. if err != nil {
  1632. return nil, err
  1633. }
  1634. if cfg.AthenaTable == "" {
  1635. return nil, fmt.Errorf("AthenaTable not configured")
  1636. }
  1637. if cfg.AthenaBucketName == "" {
  1638. return nil, fmt.Errorf("AthenaBucketName not configured")
  1639. }
  1640. q := `SHOW COLUMNS IN %s`
  1641. query := fmt.Sprintf(q, cfg.AthenaTable)
  1642. columns := []string{}
  1643. pageNum := 0
  1644. processResults := func(page *athena.GetQueryResultsOutput, lastpage bool) bool {
  1645. for _, row := range page.ResultSet.Rows {
  1646. columns = append(columns, *row.Data[0].VarCharValue)
  1647. }
  1648. pageNum++
  1649. return true
  1650. }
  1651. err = aws.QueryAthenaPaginated(query, processResults)
  1652. if err != nil {
  1653. log.Warningf("Error getting Athena columns: %s", err)
  1654. return columnSet, err
  1655. }
  1656. for _, col := range columns {
  1657. columnSet[col] = true
  1658. }
  1659. return columnSet, nil
  1660. }
  1661. type spotInfo struct {
  1662. Timestamp string `csv:"Timestamp"`
  1663. UsageType string `csv:"UsageType"`
  1664. Operation string `csv:"Operation"`
  1665. InstanceID string `csv:"InstanceID"`
  1666. MyBidID string `csv:"MyBidID"`
  1667. MyMaxPrice string `csv:"MyMaxPrice"`
  1668. MarketPrice string `csv:"MarketPrice"`
  1669. Charge string `csv:"Charge"`
  1670. Version string `csv:"Version"`
  1671. }
  1672. func (a *AWS) parseSpotData(bucket string, prefix string, projectID string, region string) (map[string]*spotInfo, error) {
  1673. if a.ServiceAccountChecks == nil { // Set up checks to store error/success states
  1674. a.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  1675. }
  1676. a.ConfigureAuth() // configure aws api authentication by setting env vars
  1677. s3Prefix := projectID
  1678. if len(prefix) != 0 {
  1679. s3Prefix = prefix + "/" + s3Prefix
  1680. }
  1681. c := aws.NewConfig().WithRegion(region)
  1682. s := session.Must(session.NewSession(c))
  1683. s3Svc := s3.New(s)
  1684. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  1685. tNow := time.Now()
  1686. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1687. ls := &s3.ListObjectsInput{
  1688. Bucket: aws.String(bucket),
  1689. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1690. }
  1691. ls2 := &s3.ListObjectsInput{
  1692. Bucket: aws.String(bucket),
  1693. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1694. }
  1695. lso, err := s3Svc.ListObjects(ls)
  1696. if err != nil {
  1697. a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
  1698. Message: "Bucket List Permissions Available",
  1699. Status: false,
  1700. AdditionalInfo: err.Error(),
  1701. }
  1702. return nil, err
  1703. } else {
  1704. a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
  1705. Message: "Bucket List Permissions Available",
  1706. Status: true,
  1707. }
  1708. }
  1709. lsoLen := len(lso.Contents)
  1710. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1711. if lsoLen == 0 {
  1712. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1713. }
  1714. lso2, err := s3Svc.ListObjects(ls2)
  1715. if err != nil {
  1716. return nil, err
  1717. }
  1718. lso2Len := len(lso2.Contents)
  1719. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1720. if lso2Len == 0 {
  1721. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1722. }
  1723. // TODO: Worth it to use LastModifiedDate to determine if we should reparse the spot data?
  1724. var keys []*string
  1725. for _, obj := range lso.Contents {
  1726. keys = append(keys, obj.Key)
  1727. }
  1728. for _, obj := range lso2.Contents {
  1729. keys = append(keys, obj.Key)
  1730. }
  1731. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1732. header, err := csvutil.Header(spotInfo{}, "csv")
  1733. if err != nil {
  1734. return nil, err
  1735. }
  1736. fieldsPerRecord := len(header)
  1737. spots := make(map[string]*spotInfo)
  1738. for _, key := range keys {
  1739. getObj := &s3.GetObjectInput{
  1740. Bucket: aws.String(bucket),
  1741. Key: key,
  1742. }
  1743. buf := aws.NewWriteAtBuffer([]byte{})
  1744. _, err := downloader.Download(buf, getObj)
  1745. if err != nil {
  1746. a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
  1747. Message: "Object Get Permissions Available",
  1748. Status: false,
  1749. AdditionalInfo: err.Error(),
  1750. }
  1751. return nil, err
  1752. } else {
  1753. a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
  1754. Message: "Object Get Permissions Available",
  1755. Status: true,
  1756. }
  1757. }
  1758. r := bytes.NewReader(buf.Bytes())
  1759. gr, err := gzip.NewReader(r)
  1760. if err != nil {
  1761. return nil, err
  1762. }
  1763. csvReader := csv.NewReader(gr)
  1764. csvReader.Comma = '\t'
  1765. csvReader.FieldsPerRecord = fieldsPerRecord
  1766. dec, err := csvutil.NewDecoder(csvReader, header...)
  1767. if err != nil {
  1768. return nil, err
  1769. }
  1770. var foundVersion string
  1771. for {
  1772. spot := spotInfo{}
  1773. err := dec.Decode(&spot)
  1774. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1775. if err == io.EOF {
  1776. break
  1777. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1778. rec := dec.Record()
  1779. // the first two "Record()" will be the comment lines
  1780. // and they show up as len() == 1
  1781. // the first of which is "#Version"
  1782. // the second of which is "#Fields: "
  1783. if len(rec) != 1 {
  1784. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1785. continue
  1786. }
  1787. if len(foundVersion) == 0 {
  1788. spotFeedVersion := rec[0]
  1789. klog.V(4).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  1790. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1791. if matches != nil {
  1792. foundVersion = matches[1]
  1793. if foundVersion != supportedSpotFeedVersion {
  1794. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1795. break
  1796. }
  1797. }
  1798. continue
  1799. } else if strings.Index(rec[0], "#") == 0 {
  1800. continue
  1801. } else {
  1802. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  1803. continue
  1804. }
  1805. } else if err != nil {
  1806. klog.V(2).Infof("Error during spot info decode: %+v", err)
  1807. continue
  1808. }
  1809. log.DedupedInfof(5, "Found spot info for: %s", spot.InstanceID)
  1810. spots[spot.InstanceID] = &spot
  1811. }
  1812. gr.Close()
  1813. }
  1814. return spots, nil
  1815. }
  1816. func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  1817. }
  1818. func (a *AWS) ServiceAccountStatus() *ServiceAccountStatus {
  1819. checks := []*ServiceAccountCheck{}
  1820. for _, v := range a.ServiceAccountChecks {
  1821. checks = append(checks, v)
  1822. }
  1823. return &ServiceAccountStatus{
  1824. Checks: checks,
  1825. }
  1826. }
  1827. func (aws *AWS) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  1828. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  1829. }
  1830. func (aws *AWS) Regions() []string {
  1831. return awsRegions
  1832. }