awsprovider.go 63 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "encoding/csv"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "regexp"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "k8s.io/klog"
  17. "github.com/kubecost/cost-model/pkg/clustercache"
  18. "github.com/kubecost/cost-model/pkg/env"
  19. "github.com/kubecost/cost-model/pkg/errors"
  20. "github.com/kubecost/cost-model/pkg/log"
  21. "github.com/kubecost/cost-model/pkg/util"
  22. "github.com/kubecost/cost-model/pkg/util/fileutil"
  23. "github.com/kubecost/cost-model/pkg/util/json"
  24. awsSDK "github.com/aws/aws-sdk-go-v2/aws"
  25. "github.com/aws/aws-sdk-go-v2/config"
  26. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  27. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  28. "github.com/aws/aws-sdk-go-v2/service/athena"
  29. athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types"
  30. "github.com/aws/aws-sdk-go-v2/service/ec2"
  31. ec2Types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
  32. "github.com/aws/aws-sdk-go-v2/service/s3"
  33. "github.com/aws/aws-sdk-go-v2/service/sts"
  34. "github.com/jszwec/csvutil"
  35. v1 "k8s.io/api/core/v1"
  36. )
  37. const supportedSpotFeedVersion = "1"
  38. const SpotInfoUpdateType = "spotinfo"
  39. const AthenaInfoUpdateType = "athenainfo"
  40. const PreemptibleType = "preemptible"
  41. const APIPricingSource = "Public API"
  42. const SpotPricingSource = "Spot Data Feed"
  43. const ReservedInstancePricingSource = "Savings Plan, Reserved Instance, and Out-Of-Cluster"
  44. func (aws *AWS) PricingSourceStatus() map[string]*PricingSource {
  45. sources := make(map[string]*PricingSource)
  46. sps := &PricingSource{
  47. Name: SpotPricingSource,
  48. }
  49. if !aws.SpotRefreshEnabled {
  50. sps.Available = false
  51. sps.Error = "Spot instances not set up"
  52. } else {
  53. sps.Error = ""
  54. if aws.SpotPricingError != nil {
  55. sps.Error = aws.SpotPricingError.Error()
  56. }
  57. if sps.Error != "" {
  58. sps.Available = false
  59. } else if len(aws.SpotPricingByInstanceID) > 0 {
  60. sps.Available = true
  61. } else {
  62. sps.Error = "No spot instances detected"
  63. }
  64. }
  65. sources[SpotPricingSource] = sps
  66. rps := &PricingSource{
  67. Name: ReservedInstancePricingSource,
  68. }
  69. rps.Error = ""
  70. if aws.RIPricingError != nil {
  71. rps.Error = aws.RIPricingError.Error()
  72. }
  73. if rps.Error != "" {
  74. rps.Available = false
  75. } else {
  76. rps.Available = true
  77. }
  78. sources[ReservedInstancePricingSource] = rps
  79. return sources
  80. }
  81. // How often spot data is refreshed
  82. const SpotRefreshDuration = 15 * time.Minute
  83. const defaultConfigPath = "/var/configs/"
  84. var awsRegions = []string{
  85. "us-east-2",
  86. "us-east-1",
  87. "us-west-1",
  88. "us-west-2",
  89. "ap-east-1",
  90. "ap-south-1",
  91. "ap-northeast-3",
  92. "ap-northeast-2",
  93. "ap-southeast-1",
  94. "ap-southeast-2",
  95. "ap-northeast-1",
  96. "ap-southeast-3",
  97. "ca-central-1",
  98. "cn-north-1",
  99. "cn-northwest-1",
  100. "eu-central-1",
  101. "eu-west-1",
  102. "eu-west-2",
  103. "eu-west-3",
  104. "eu-north-1",
  105. "eu-south-1",
  106. "me-south-1",
  107. "sa-east-1",
  108. "af-south-1",
  109. "us-gov-east-1",
  110. "us-gov-west-1",
  111. }
  112. // AWS represents an Amazon Provider
  113. type AWS struct {
  114. Pricing map[string]*AWSProductTerms
  115. SpotPricingByInstanceID map[string]*spotInfo
  116. SpotPricingUpdatedAt *time.Time
  117. SpotRefreshEnabled bool
  118. SpotRefreshRunning bool
  119. SpotPricingLock sync.RWMutex
  120. SpotPricingError error
  121. RIPricingByInstanceID map[string]*RIData
  122. RIPricingError error
  123. RIDataRunning bool
  124. RIDataLock sync.RWMutex
  125. SavingsPlanDataByInstanceID map[string]*SavingsPlanData
  126. SavingsPlanDataRunning bool
  127. SavingsPlanDataLock sync.RWMutex
  128. ValidPricingKeys map[string]bool
  129. Clientset clustercache.ClusterCache
  130. BaseCPUPrice string
  131. BaseRAMPrice string
  132. BaseGPUPrice string
  133. BaseSpotCPUPrice string
  134. BaseSpotRAMPrice string
  135. BaseSpotGPUPrice string
  136. SpotLabelName string
  137. SpotLabelValue string
  138. SpotDataRegion string
  139. SpotDataBucket string
  140. SpotDataPrefix string
  141. ProjectID string
  142. DownloadPricingDataLock sync.RWMutex
  143. Config *ProviderConfig
  144. serviceAccountChecks *ServiceAccountChecks
  145. clusterManagementPrice float64
  146. clusterAccountId string
  147. clusterRegion string
  148. clusterProvisioner string
  149. *CustomProvider
  150. }
  151. // AWSAccessKey holds AWS credentials and fulfils the awsV2.CredentialsProvider interface
  152. type AWSAccessKey struct {
  153. AccessKeyID string `json:"aws_access_key_id"`
  154. SecretAccessKey string `json:"aws_secret_access_key"`
  155. }
  156. // Retrieve returns a set of awsV2 credentials using the AWSAccessKey's key and secret.
  157. // This fulfils the awsV2.CredentialsProvider interface contract.
  158. func (accessKey AWSAccessKey) Retrieve(ctx context.Context) (awsSDK.Credentials, error) {
  159. return awsSDK.Credentials{
  160. AccessKeyID: accessKey.AccessKeyID,
  161. SecretAccessKey: accessKey.SecretAccessKey,
  162. }, nil
  163. }
  164. // CreateConfig creates an AWS SDK V2 Config for the credentials that it contains for the provided region
  165. func (accessKey AWSAccessKey) CreateConfig(region string) (awsSDK.Config, error) {
  166. var cfg awsSDK.Config
  167. var err error
  168. // If accessKey values have not been provided, attempt to load cfg from service key annotations
  169. if accessKey.AccessKeyID == "" && accessKey.SecretAccessKey == "" {
  170. cfg, err = config.LoadDefaultConfig(context.TODO(), config.WithRegion(region))
  171. if err != nil {
  172. return cfg, fmt.Errorf("failed to initialize AWS SDK config for region from annotation %s: %s", region, err)
  173. }
  174. } else {
  175. // The AWS SDK v2 requires an object fulfilling the CredentialsProvider interface, which cloud.AWSAccessKey does
  176. cfg, err = config.LoadDefaultConfig(context.TODO(), config.WithCredentialsProvider(accessKey), config.WithRegion(region))
  177. if err != nil {
  178. return cfg, fmt.Errorf("failed to initialize AWS SDK config for region %s: %s", region, err)
  179. }
  180. }
  181. return cfg, nil
  182. }
  183. // AWSPricing maps a k8s node to an AWS Pricing "product"
  184. type AWSPricing struct {
  185. Products map[string]*AWSProduct `json:"products"`
  186. Terms AWSPricingTerms `json:"terms"`
  187. }
  188. // AWSProduct represents a purchased SKU
  189. type AWSProduct struct {
  190. Sku string `json:"sku"`
  191. Attributes AWSProductAttributes `json:"attributes"`
  192. }
  193. // AWSProductAttributes represents metadata about the product used to map to a node.
  194. type AWSProductAttributes struct {
  195. Location string `json:"location"`
  196. InstanceType string `json:"instanceType"`
  197. Memory string `json:"memory"`
  198. Storage string `json:"storage"`
  199. VCpu string `json:"vcpu"`
  200. UsageType string `json:"usagetype"`
  201. OperatingSystem string `json:"operatingSystem"`
  202. PreInstalledSw string `json:"preInstalledSw"`
  203. InstanceFamily string `json:"instanceFamily"`
  204. CapacityStatus string `json:"capacitystatus"`
  205. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  206. }
  207. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  208. type AWSPricingTerms struct {
  209. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  210. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  211. }
  212. // AWSOfferTerm is a sku extension used to pay for the node.
  213. type AWSOfferTerm struct {
  214. Sku string `json:"sku"`
  215. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  216. }
  217. func (ot *AWSOfferTerm) String() string {
  218. var strs []string
  219. for k, rc := range ot.PriceDimensions {
  220. strs = append(strs, fmt.Sprintf("%s:%s", k, rc.String()))
  221. }
  222. return fmt.Sprintf("%s:%s", ot.Sku, strings.Join(strs, ","))
  223. }
  224. // AWSRateCode encodes data about the price of a product
  225. type AWSRateCode struct {
  226. Unit string `json:"unit"`
  227. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  228. }
  229. func (rc *AWSRateCode) String() string {
  230. return fmt.Sprintf("{unit: %s, pricePerUnit: %v", rc.Unit, rc.PricePerUnit)
  231. }
  232. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  233. type AWSCurrencyCode struct {
  234. USD string `json:"USD,omitempty"`
  235. CNY string `json:"CNY,omitempty"`
  236. }
  237. // AWSProductTerms represents the full terms of the product
  238. type AWSProductTerms struct {
  239. Sku string `json:"sku"`
  240. OnDemand *AWSOfferTerm `json:"OnDemand"`
  241. Reserved *AWSOfferTerm `json:"Reserved"`
  242. Memory string `json:"memory"`
  243. Storage string `json:"storage"`
  244. VCpu string `json:"vcpu"`
  245. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  246. PV *PV `json:"pv"`
  247. }
  248. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  249. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  250. // OnDemandRateCode is appended to an node sku
  251. const OnDemandRateCode = ".JRTCKXETXF"
  252. const OnDemandRateCodeCn = ".99YE2YK9UR"
  253. // ReservedRateCode is appended to a node sku
  254. const ReservedRateCode = ".38NPMPTW36"
  255. // HourlyRateCode is appended to a node sku
  256. const HourlyRateCode = ".6YS6EN2CT7"
  257. const HourlyRateCodeCn = ".Q7UJUT2CE6"
  258. // volTypes are used to map between AWS UsageTypes and
  259. // EBS volume types, as they would appear in K8s storage class
  260. // name and the EC2 API.
  261. var volTypes = map[string]string{
  262. "EBS:VolumeUsage.gp2": "gp2",
  263. "EBS:VolumeUsage": "standard",
  264. "EBS:VolumeUsage.sc1": "sc1",
  265. "EBS:VolumeP-IOPS.piops": "io1",
  266. "EBS:VolumeUsage.st1": "st1",
  267. "EBS:VolumeUsage.piops": "io1",
  268. "gp2": "EBS:VolumeUsage.gp2",
  269. "standard": "EBS:VolumeUsage",
  270. "sc1": "EBS:VolumeUsage.sc1",
  271. "io1": "EBS:VolumeUsage.piops",
  272. "st1": "EBS:VolumeUsage.st1",
  273. }
  274. // locationToRegion maps AWS region names (As they come from Billing)
  275. // to actual region identifiers
  276. var locationToRegion = map[string]string{
  277. "US East (Ohio)": "us-east-2",
  278. "US East (N. Virginia)": "us-east-1",
  279. "US West (N. California)": "us-west-1",
  280. "US West (Oregon)": "us-west-2",
  281. "Asia Pacific (Hong Kong)": "ap-east-1",
  282. "Asia Pacific (Mumbai)": "ap-south-1",
  283. "Asia Pacific (Osaka)": "ap-northeast-3",
  284. "Asia Pacific (Seoul)": "ap-northeast-2",
  285. "Asia Pacific (Singapore)": "ap-southeast-1",
  286. "Asia Pacific (Sydney)": "ap-southeast-2",
  287. "Asia Pacific (Tokyo)": "ap-northeast-1",
  288. "Asia Pacific (Jakarta)": "ap-southeast-3",
  289. "Canada (Central)": "ca-central-1",
  290. "China (Beijing)": "cn-north-1",
  291. "China (Ningxia)": "cn-northwest-1",
  292. "EU (Frankfurt)": "eu-central-1",
  293. "EU (Ireland)": "eu-west-1",
  294. "EU (London)": "eu-west-2",
  295. "EU (Paris)": "eu-west-3",
  296. "EU (Stockholm)": "eu-north-1",
  297. "EU (Milan)": "eu-south-1",
  298. "South America (Sao Paulo)": "sa-east-1",
  299. "Africa (Cape Town)": "af-south-1",
  300. "AWS GovCloud (US-East)": "us-gov-east-1",
  301. "AWS GovCloud (US-West)": "us-gov-west-1",
  302. }
  303. var regionToBillingRegionCode = map[string]string{
  304. "us-east-2": "USE2",
  305. "us-east-1": "",
  306. "us-west-1": "USW1",
  307. "us-west-2": "USW2",
  308. "ap-east-1": "APE1",
  309. "ap-south-1": "APS3",
  310. "ap-northeast-3": "APN3",
  311. "ap-northeast-2": "APN2",
  312. "ap-southeast-1": "APS1",
  313. "ap-southeast-2": "APS2",
  314. "ap-northeast-1": "APN1",
  315. "ap-southeast-3": "APS4",
  316. "ca-central-1": "CAN1",
  317. "cn-north-1": "",
  318. "cn-northwest-1": "",
  319. "eu-central-1": "EUC1",
  320. "eu-west-1": "EU",
  321. "eu-west-2": "EUW2",
  322. "eu-west-3": "EUW3",
  323. "eu-north-1": "EUN1",
  324. "eu-south-1": "EUS1",
  325. "sa-east-1": "SAE1",
  326. "af-south-1": "AFS1",
  327. "us-gov-east-1": "UGE1",
  328. "us-gov-west-1": "UGW1",
  329. }
  330. var loadedAWSSecret bool = false
  331. var awsSecret *AWSAccessKey = nil
  332. func (aws *AWS) GetLocalStorageQuery(window, offset time.Duration, rate bool, used bool) string {
  333. return ""
  334. }
  335. // KubeAttrConversion maps the k8s labels for region to an aws region
  336. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  337. operatingSystem = strings.ToLower(operatingSystem)
  338. region := locationToRegion[location]
  339. return region + "," + instanceType + "," + operatingSystem
  340. }
  341. // AwsSpotFeedInfo contains configuration for spot feed integration
  342. type AwsSpotFeedInfo struct {
  343. BucketName string `json:"bucketName"`
  344. Prefix string `json:"prefix"`
  345. Region string `json:"region"`
  346. AccountID string `json:"projectID"`
  347. ServiceKeyName string `json:"serviceKeyName"`
  348. ServiceKeySecret string `json:"serviceKeySecret"`
  349. SpotLabel string `json:"spotLabel"`
  350. SpotLabelValue string `json:"spotLabelValue"`
  351. }
  352. // AwsAthenaInfo contains configuration for CUR integration
  353. type AwsAthenaInfo struct {
  354. AthenaBucketName string `json:"athenaBucketName"`
  355. AthenaRegion string `json:"athenaRegion"`
  356. AthenaDatabase string `json:"athenaDatabase"`
  357. AthenaTable string `json:"athenaTable"`
  358. ServiceKeyName string `json:"serviceKeyName"`
  359. ServiceKeySecret string `json:"serviceKeySecret"`
  360. AccountID string `json:"projectID"`
  361. MasterPayerARN string `json:"masterPayerARN"`
  362. }
  363. // IsEmpty returns true if all fields in config are empty, false if not.
  364. func (aai *AwsAthenaInfo) IsEmpty() bool {
  365. return aai.AthenaBucketName == "" &&
  366. aai.AthenaRegion == "" &&
  367. aai.AthenaDatabase == "" &&
  368. aai.AthenaTable == "" &&
  369. aai.ServiceKeyName == "" &&
  370. aai.ServiceKeySecret == "" &&
  371. aai.AccountID == "" &&
  372. aai.MasterPayerARN == ""
  373. }
  374. // CreateConfig creates an AWS SDK V2 Config for the credentials that it contains
  375. func (aai *AwsAthenaInfo) CreateConfig() (awsSDK.Config, error) {
  376. keyProvider := AWSAccessKey{AccessKeyID: aai.ServiceKeyName, SecretAccessKey: aai.ServiceKeySecret}
  377. cfg, err := keyProvider.CreateConfig(aai.AthenaRegion)
  378. if err != nil {
  379. return cfg, err
  380. }
  381. if aai.MasterPayerARN != "" {
  382. // Create the credentials from AssumeRoleProvider to assume the role
  383. // referenced by the roleARN.
  384. stsSvc := sts.NewFromConfig(cfg)
  385. creds := stscreds.NewAssumeRoleProvider(stsSvc, aai.MasterPayerARN)
  386. cfg.Credentials = awsSDK.NewCredentialsCache(creds)
  387. }
  388. return cfg, nil
  389. }
  390. func (aws *AWS) GetManagementPlatform() (string, error) {
  391. nodes := aws.Clientset.GetAllNodes()
  392. if len(nodes) > 0 {
  393. n := nodes[0]
  394. version := n.Status.NodeInfo.KubeletVersion
  395. if strings.Contains(version, "eks") {
  396. return "eks", nil
  397. }
  398. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  399. return "kops", nil
  400. }
  401. }
  402. return "", nil
  403. }
  404. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  405. c, err := aws.Config.GetCustomPricingData()
  406. if err != nil {
  407. return nil, err
  408. }
  409. if c.Discount == "" {
  410. c.Discount = "0%"
  411. }
  412. if c.NegotiatedDiscount == "" {
  413. c.NegotiatedDiscount = "0%"
  414. }
  415. if c.ShareTenancyCosts == "" {
  416. c.ShareTenancyCosts = defaultShareTenancyCost
  417. }
  418. return c, nil
  419. }
  420. // GetAWSAccessKey generate an AWSAccessKey object from the config
  421. func (aws *AWS) GetAWSAccessKey() (*AWSAccessKey, error) {
  422. config, err := aws.GetConfig()
  423. if err != nil {
  424. return nil, fmt.Errorf("could not retrieve AwsAthenaInfo %s", err)
  425. }
  426. err = aws.ConfigureAuthWith(config)
  427. if err != nil {
  428. return nil, fmt.Errorf("error configuring Cloud Provider %s", err)
  429. }
  430. //Look for service key values in env if not present in config
  431. if config.ServiceKeyName == "" {
  432. config.ServiceKeyName = env.GetAWSAccessKeyID()
  433. }
  434. if config.ServiceKeySecret == "" {
  435. config.ServiceKeySecret = env.GetAWSAccessKeySecret()
  436. }
  437. if config.ServiceKeyName == "" && config.ServiceKeySecret == "" {
  438. log.DedupedInfof(1, "missing service key values for AWS cloud integration attempting to use service account integration")
  439. }
  440. return &AWSAccessKey{AccessKeyID: config.ServiceKeyName, SecretAccessKey: config.ServiceKeySecret}, nil
  441. }
  442. // GetAWSAthenaInfo generate an AWSAthenaInfo object from the config
  443. func (aws *AWS) GetAWSAthenaInfo() (*AwsAthenaInfo, error) {
  444. config, err := aws.GetConfig()
  445. if err != nil {
  446. return nil, fmt.Errorf("could not retrieve AwsAthenaInfo %s", err)
  447. }
  448. aak, err := aws.GetAWSAccessKey()
  449. if err != nil {
  450. return nil, err
  451. }
  452. return &AwsAthenaInfo{
  453. AthenaBucketName: config.AthenaBucketName,
  454. AthenaRegion: config.AthenaRegion,
  455. AthenaDatabase: config.AthenaDatabase,
  456. AthenaTable: config.AthenaTable,
  457. ServiceKeyName: aak.AccessKeyID,
  458. ServiceKeySecret: aak.SecretAccessKey,
  459. AccountID: config.AthenaProjectID,
  460. MasterPayerARN: config.MasterPayerARN,
  461. }, nil
  462. }
  463. func (aws *AWS) UpdateConfigFromConfigMap(cm map[string]string) (*CustomPricing, error) {
  464. return aws.Config.UpdateFromMap(cm)
  465. }
  466. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  467. return aws.Config.Update(func(c *CustomPricing) error {
  468. if updateType == SpotInfoUpdateType {
  469. asfi := AwsSpotFeedInfo{}
  470. err := json.NewDecoder(r).Decode(&asfi)
  471. if err != nil {
  472. return err
  473. }
  474. c.ServiceKeyName = asfi.ServiceKeyName
  475. if asfi.ServiceKeySecret != "" {
  476. c.ServiceKeySecret = asfi.ServiceKeySecret
  477. }
  478. c.SpotDataPrefix = asfi.Prefix
  479. c.SpotDataBucket = asfi.BucketName
  480. c.ProjectID = asfi.AccountID
  481. c.SpotDataRegion = asfi.Region
  482. c.SpotLabel = asfi.SpotLabel
  483. c.SpotLabelValue = asfi.SpotLabelValue
  484. } else if updateType == AthenaInfoUpdateType {
  485. aai := AwsAthenaInfo{}
  486. err := json.NewDecoder(r).Decode(&aai)
  487. if err != nil {
  488. return err
  489. }
  490. c.AthenaBucketName = aai.AthenaBucketName
  491. c.AthenaRegion = aai.AthenaRegion
  492. c.AthenaDatabase = aai.AthenaDatabase
  493. c.AthenaTable = aai.AthenaTable
  494. c.ServiceKeyName = aai.ServiceKeyName
  495. if aai.ServiceKeySecret != "" {
  496. c.ServiceKeySecret = aai.ServiceKeySecret
  497. }
  498. if aai.MasterPayerARN != "" {
  499. c.MasterPayerARN = aai.MasterPayerARN
  500. }
  501. c.AthenaProjectID = aai.AccountID
  502. } else {
  503. a := make(map[string]interface{})
  504. err := json.NewDecoder(r).Decode(&a)
  505. if err != nil {
  506. return err
  507. }
  508. for k, v := range a {
  509. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  510. vstr, ok := v.(string)
  511. if ok {
  512. err := SetCustomPricingField(c, kUpper, vstr)
  513. if err != nil {
  514. return err
  515. }
  516. } else {
  517. return fmt.Errorf("type error while updating config for %s", kUpper)
  518. }
  519. }
  520. }
  521. if env.IsRemoteEnabled() {
  522. err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
  523. if err != nil {
  524. return err
  525. }
  526. }
  527. return nil
  528. })
  529. }
  530. type awsKey struct {
  531. SpotLabelName string
  532. SpotLabelValue string
  533. Labels map[string]string
  534. ProviderID string
  535. }
  536. func (k *awsKey) GPUType() string {
  537. return ""
  538. }
  539. func (k *awsKey) ID() string {
  540. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  541. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  542. if matchNum == 2 {
  543. return group
  544. }
  545. }
  546. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  547. return ""
  548. }
  549. func (k *awsKey) Features() string {
  550. instanceType, _ := util.GetInstanceType(k.Labels)
  551. operatingSystem, _ := util.GetOperatingSystem(k.Labels)
  552. region, _ := util.GetRegion(k.Labels)
  553. key := region + "," + instanceType + "," + operatingSystem
  554. usageType := PreemptibleType
  555. spotKey := key + "," + usageType
  556. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  557. return spotKey
  558. }
  559. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  560. return spotKey
  561. }
  562. return key
  563. }
  564. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  565. pricing, ok := aws.Pricing[pvk.Features()]
  566. if !ok {
  567. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  568. return &PV{}, nil
  569. }
  570. return pricing.PV, nil
  571. }
  572. type awsPVKey struct {
  573. Labels map[string]string
  574. StorageClassParameters map[string]string
  575. StorageClassName string
  576. Name string
  577. DefaultRegion string
  578. ProviderID string
  579. }
  580. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  581. providerID := ""
  582. if pv.Spec.AWSElasticBlockStore != nil {
  583. providerID = pv.Spec.AWSElasticBlockStore.VolumeID
  584. } else if pv.Spec.CSI != nil {
  585. providerID = pv.Spec.CSI.VolumeHandle
  586. }
  587. return &awsPVKey{
  588. Labels: pv.Labels,
  589. StorageClassName: pv.Spec.StorageClassName,
  590. StorageClassParameters: parameters,
  591. Name: pv.Name,
  592. DefaultRegion: defaultRegion,
  593. ProviderID: providerID,
  594. }
  595. }
  596. func (key *awsPVKey) ID() string {
  597. return key.ProviderID
  598. }
  599. func (key *awsPVKey) GetStorageClass() string {
  600. return key.StorageClassName
  601. }
  602. func (key *awsPVKey) Features() string {
  603. storageClass := key.StorageClassParameters["type"]
  604. if storageClass == "standard" {
  605. storageClass = "gp2"
  606. }
  607. // Storage class names are generally EBS volume types (gp2)
  608. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  609. // Converts between the 2
  610. region, ok := util.GetRegion(key.Labels)
  611. if !ok {
  612. region = key.DefaultRegion
  613. }
  614. class, ok := volTypes[storageClass]
  615. if !ok {
  616. klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  617. }
  618. return region + "," + class
  619. }
  620. // GetKey maps node labels to information needed to retrieve pricing data
  621. func (aws *AWS) GetKey(labels map[string]string, n *v1.Node) Key {
  622. return &awsKey{
  623. SpotLabelName: aws.SpotLabelName,
  624. SpotLabelValue: aws.SpotLabelValue,
  625. Labels: labels,
  626. ProviderID: labels["providerID"],
  627. }
  628. }
  629. func (aws *AWS) isPreemptible(key string) bool {
  630. s := strings.Split(key, ",")
  631. if len(s) == 4 && s[3] == PreemptibleType {
  632. return true
  633. }
  634. return false
  635. }
  636. func (aws *AWS) ClusterManagementPricing() (string, float64, error) {
  637. return aws.clusterProvisioner, aws.clusterManagementPrice, nil
  638. }
  639. // Use the pricing data from the current region. Fall back to using all region data if needed.
  640. func (aws *AWS) getRegionPricing(nodeList []*v1.Node) (*http.Response, string, error) {
  641. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/"
  642. region := ""
  643. multiregion := false
  644. for _, n := range nodeList {
  645. labels := n.GetLabels()
  646. currentNodeRegion := ""
  647. if r, ok := util.GetRegion(labels); ok {
  648. currentNodeRegion = r
  649. // Switch to Chinese endpoint for regions with the Chinese prefix
  650. if strings.HasPrefix(currentNodeRegion, "cn-") {
  651. pricingURL = "https://pricing.cn-north-1.amazonaws.com.cn/offers/v1.0/cn/AmazonEC2/current/"
  652. }
  653. } else {
  654. multiregion = true // We weren't able to detect the node's region, so pull all data.
  655. break
  656. }
  657. if region == "" { // We haven't set a region yet
  658. region = currentNodeRegion
  659. } else if region != "" && currentNodeRegion != region { // If two nodes have different regions here, we'll need to fetch all pricing data.
  660. multiregion = true
  661. break
  662. }
  663. }
  664. // Chinese multiregion endpoint only contains data for Chinese regions and Chinese regions are excluded from other endpoint
  665. if region != "" && !multiregion {
  666. pricingURL += region + "/"
  667. }
  668. pricingURL += "index.json"
  669. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  670. resp, err := http.Get(pricingURL)
  671. if err != nil {
  672. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  673. return nil, pricingURL, err
  674. }
  675. return resp, pricingURL, err
  676. }
  677. // DownloadPricingData fetches data from the AWS Pricing API
  678. func (aws *AWS) DownloadPricingData() error {
  679. aws.DownloadPricingDataLock.Lock()
  680. defer aws.DownloadPricingDataLock.Unlock()
  681. c, err := aws.Config.GetCustomPricingData()
  682. if err != nil {
  683. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  684. }
  685. aws.BaseCPUPrice = c.CPU
  686. aws.BaseRAMPrice = c.RAM
  687. aws.BaseGPUPrice = c.GPU
  688. aws.BaseSpotCPUPrice = c.SpotCPU
  689. aws.BaseSpotRAMPrice = c.SpotRAM
  690. aws.BaseSpotGPUPrice = c.SpotGPU
  691. aws.SpotLabelName = c.SpotLabel
  692. aws.SpotLabelValue = c.SpotLabelValue
  693. aws.SpotDataBucket = c.SpotDataBucket
  694. aws.SpotDataPrefix = c.SpotDataPrefix
  695. aws.ProjectID = c.ProjectID
  696. aws.SpotDataRegion = c.SpotDataRegion
  697. aws.ConfigureAuthWith(c) // load aws authentication from configuration or secret
  698. // Need valid values for at least one of the three fields to consider spot pricing enabled
  699. if len(aws.SpotDataBucket) != 0 || len(aws.SpotDataRegion) != 0 || len(aws.ProjectID) != 0 {
  700. aws.SpotRefreshEnabled = true
  701. } else {
  702. aws.SpotRefreshEnabled = false
  703. }
  704. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  705. klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  706. }
  707. nodeList := aws.Clientset.GetAllNodes()
  708. inputkeys := make(map[string]bool)
  709. for _, n := range nodeList {
  710. if _, ok := n.Labels["eks.amazonaws.com/nodegroup"]; ok {
  711. aws.clusterManagementPrice = 0.10
  712. aws.clusterProvisioner = "EKS"
  713. } else if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  714. aws.clusterProvisioner = "KOPS"
  715. }
  716. labels := n.GetObjectMeta().GetLabels()
  717. key := aws.GetKey(labels, n)
  718. inputkeys[key.Features()] = true
  719. }
  720. pvList := aws.Clientset.GetAllPersistentVolumes()
  721. storageClasses := aws.Clientset.GetAllStorageClasses()
  722. storageClassMap := make(map[string]map[string]string)
  723. for _, storageClass := range storageClasses {
  724. params := storageClass.Parameters
  725. storageClassMap[storageClass.ObjectMeta.Name] = params
  726. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  727. storageClassMap["default"] = params
  728. storageClassMap[""] = params
  729. }
  730. }
  731. pvkeys := make(map[string]PVKey)
  732. for _, pv := range pvList {
  733. params, ok := storageClassMap[pv.Spec.StorageClassName]
  734. if !ok {
  735. klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  736. continue
  737. }
  738. key := aws.GetPVKey(pv, params, "")
  739. pvkeys[key.Features()] = key
  740. }
  741. // RIDataRunning establishes the existance of the goroutine. Since it's possible we
  742. // run multiple downloads, we don't want to create multiple go routines if one already exists
  743. if !aws.RIDataRunning {
  744. err = aws.GetReservationDataFromAthena() // Block until one run has completed.
  745. if err != nil {
  746. klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
  747. } else { // If we make one successful run, check on new reservation data every hour
  748. go func() {
  749. defer errors.HandlePanic()
  750. aws.RIDataRunning = true
  751. for {
  752. klog.Infof("Reserved Instance watcher running... next update in 1h")
  753. time.Sleep(time.Hour)
  754. err := aws.GetReservationDataFromAthena()
  755. if err != nil {
  756. klog.Infof("Error updating RI data: %s", err.Error())
  757. }
  758. }
  759. }()
  760. }
  761. }
  762. if !aws.SavingsPlanDataRunning {
  763. err = aws.GetSavingsPlanDataFromAthena()
  764. if err != nil {
  765. klog.V(1).Infof("Failed to lookup savings plan data: %s", err.Error())
  766. } else {
  767. go func() {
  768. defer errors.HandlePanic()
  769. aws.SavingsPlanDataRunning = true
  770. for {
  771. klog.Infof("Savings Plan watcher running... next update in 1h")
  772. time.Sleep(time.Hour)
  773. err := aws.GetSavingsPlanDataFromAthena()
  774. if err != nil {
  775. klog.Infof("Error updating Savings Plan data: %s", err.Error())
  776. }
  777. }
  778. }()
  779. }
  780. }
  781. aws.Pricing = make(map[string]*AWSProductTerms)
  782. aws.ValidPricingKeys = make(map[string]bool)
  783. skusToKeys := make(map[string]string)
  784. resp, pricingURL, err := aws.getRegionPricing(nodeList)
  785. if err != nil {
  786. return err
  787. }
  788. dec := json.NewDecoder(resp.Body)
  789. for {
  790. t, err := dec.Token()
  791. if err == io.EOF {
  792. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  793. break
  794. } else if err != nil {
  795. klog.V(2).Infof("error parsing response json %v", resp.Body)
  796. break
  797. }
  798. if t == "products" {
  799. _, err := dec.Token() // this should parse the opening "{""
  800. if err != nil {
  801. return err
  802. }
  803. for dec.More() {
  804. _, err := dec.Token() // the sku token
  805. if err != nil {
  806. return err
  807. }
  808. product := &AWSProduct{}
  809. err = dec.Decode(&product)
  810. if err != nil {
  811. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  812. break
  813. }
  814. if product.Attributes.PreInstalledSw == "NA" &&
  815. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) &&
  816. product.Attributes.CapacityStatus == "Used" {
  817. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  818. spotKey := key + ",preemptible"
  819. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  820. productTerms := &AWSProductTerms{
  821. Sku: product.Sku,
  822. Memory: product.Attributes.Memory,
  823. Storage: product.Attributes.Storage,
  824. VCpu: product.Attributes.VCpu,
  825. GPU: product.Attributes.GPU,
  826. }
  827. aws.Pricing[key] = productTerms
  828. aws.Pricing[spotKey] = productTerms
  829. skusToKeys[product.Sku] = key
  830. }
  831. aws.ValidPricingKeys[key] = true
  832. aws.ValidPricingKeys[spotKey] = true
  833. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  834. // UsageTypes may be prefixed with a region code - we're removing this when using
  835. // volTypes to keep lookups generic
  836. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  837. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  838. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  839. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  840. spotKey := key + ",preemptible"
  841. pv := &PV{
  842. Class: volTypes[usageTypeNoRegion],
  843. Region: locationToRegion[product.Attributes.Location],
  844. }
  845. productTerms := &AWSProductTerms{
  846. Sku: product.Sku,
  847. PV: pv,
  848. }
  849. aws.Pricing[key] = productTerms
  850. aws.Pricing[spotKey] = productTerms
  851. skusToKeys[product.Sku] = key
  852. aws.ValidPricingKeys[key] = true
  853. aws.ValidPricingKeys[spotKey] = true
  854. }
  855. }
  856. }
  857. if t == "terms" {
  858. _, err := dec.Token() // this should parse the opening "{""
  859. if err != nil {
  860. return err
  861. }
  862. termType, err := dec.Token()
  863. if err != nil {
  864. return err
  865. }
  866. if termType == "OnDemand" {
  867. _, err := dec.Token()
  868. if err != nil { // again, should parse an opening "{"
  869. return err
  870. }
  871. for dec.More() {
  872. sku, err := dec.Token()
  873. if err != nil {
  874. return err
  875. }
  876. _, err = dec.Token() // another opening "{"
  877. if err != nil {
  878. return err
  879. }
  880. skuOnDemand, err := dec.Token()
  881. if err != nil {
  882. return err
  883. }
  884. offerTerm := &AWSOfferTerm{}
  885. err = dec.Decode(&offerTerm)
  886. if err != nil {
  887. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  888. }
  889. key, ok := skusToKeys[sku.(string)]
  890. spotKey := key + ",preemptible"
  891. if ok {
  892. aws.Pricing[key].OnDemand = offerTerm
  893. aws.Pricing[spotKey].OnDemand = offerTerm
  894. var cost string
  895. if sku.(string)+OnDemandRateCode == skuOnDemand {
  896. cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  897. } else if sku.(string)+OnDemandRateCodeCn == skuOnDemand {
  898. cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCodeCn+HourlyRateCodeCn].PricePerUnit.CNY
  899. }
  900. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  901. // If the specific UsageType is the per IO cost used on io1 volumes
  902. // we need to add the per IO cost to the io1 PV cost
  903. // Add the per IO cost to the PV object for the io1 volume type
  904. aws.Pricing[key].PV.CostPerIO = cost
  905. } else if strings.Contains(key, "EBS:Volume") {
  906. // If volume, we need to get hourly cost and add it to the PV object
  907. costFloat, _ := strconv.ParseFloat(cost, 64)
  908. hourlyPrice := costFloat / 730
  909. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  910. }
  911. }
  912. _, err = dec.Token()
  913. if err != nil {
  914. return err
  915. }
  916. }
  917. _, err = dec.Token()
  918. if err != nil {
  919. return err
  920. }
  921. }
  922. }
  923. }
  924. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  925. if !aws.SpotRefreshEnabled {
  926. return nil
  927. }
  928. // Always run spot pricing refresh when performing download
  929. aws.refreshSpotPricing(true)
  930. // Only start a single refresh goroutine
  931. if !aws.SpotRefreshRunning {
  932. aws.SpotRefreshRunning = true
  933. go func() {
  934. defer errors.HandlePanic()
  935. for {
  936. klog.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
  937. time.Sleep(SpotRefreshDuration)
  938. // Reoccurring refresh checks update times
  939. aws.refreshSpotPricing(false)
  940. }
  941. }()
  942. }
  943. return nil
  944. }
  945. func (aws *AWS) refreshSpotPricing(force bool) {
  946. aws.SpotPricingLock.Lock()
  947. defer aws.SpotPricingLock.Unlock()
  948. now := time.Now().UTC()
  949. updateTime := now.Add(-SpotRefreshDuration)
  950. // Return if there was an update time set and an hour hasn't elapsed
  951. if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
  952. return
  953. }
  954. sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion)
  955. if err != nil {
  956. klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
  957. aws.SpotPricingError = err
  958. return
  959. }
  960. aws.SpotPricingError = nil
  961. // update time last updated
  962. aws.SpotPricingUpdatedAt = &now
  963. aws.SpotPricingByInstanceID = sp
  964. }
  965. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  966. func (aws *AWS) NetworkPricing() (*Network, error) {
  967. cpricing, err := aws.Config.GetCustomPricingData()
  968. if err != nil {
  969. return nil, err
  970. }
  971. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  972. if err != nil {
  973. return nil, err
  974. }
  975. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  976. if err != nil {
  977. return nil, err
  978. }
  979. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  980. if err != nil {
  981. return nil, err
  982. }
  983. return &Network{
  984. ZoneNetworkEgressCost: znec,
  985. RegionNetworkEgressCost: rnec,
  986. InternetNetworkEgressCost: inec,
  987. }, nil
  988. }
  989. func (aws *AWS) LoadBalancerPricing() (*LoadBalancer, error) {
  990. fffrc := 0.025
  991. afrc := 0.010
  992. lbidc := 0.008
  993. numForwardingRules := 1.0
  994. dataIngressGB := 0.0
  995. var totalCost float64
  996. if numForwardingRules < 5 {
  997. totalCost = fffrc*numForwardingRules + lbidc*dataIngressGB
  998. } else {
  999. totalCost = fffrc*5 + afrc*(numForwardingRules-5) + lbidc*dataIngressGB
  1000. }
  1001. return &LoadBalancer{
  1002. Cost: totalCost,
  1003. }, nil
  1004. }
  1005. // AllNodePricing returns all the billing data fetched.
  1006. func (aws *AWS) AllNodePricing() (interface{}, error) {
  1007. aws.DownloadPricingDataLock.RLock()
  1008. defer aws.DownloadPricingDataLock.RUnlock()
  1009. return aws.Pricing, nil
  1010. }
  1011. func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
  1012. aws.SpotPricingLock.RLock()
  1013. defer aws.SpotPricingLock.RUnlock()
  1014. info, ok := aws.SpotPricingByInstanceID[instanceID]
  1015. return info, ok
  1016. }
  1017. func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
  1018. aws.RIDataLock.RLock()
  1019. defer aws.RIDataLock.RUnlock()
  1020. data, ok := aws.RIPricingByInstanceID[instanceID]
  1021. return data, ok
  1022. }
  1023. func (aws *AWS) savingsPlanPricing(instanceID string) (*SavingsPlanData, bool) {
  1024. aws.SavingsPlanDataLock.RLock()
  1025. defer aws.SavingsPlanDataLock.RUnlock()
  1026. data, ok := aws.SavingsPlanDataByInstanceID[instanceID]
  1027. return data, ok
  1028. }
  1029. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  1030. key := k.Features()
  1031. if spotInfo, ok := aws.spotPricing(k.ID()); ok {
  1032. var spotcost string
  1033. log.DedupedInfof(5, "Looking up spot data from feed for node %s", k.ID())
  1034. arr := strings.Split(spotInfo.Charge, " ")
  1035. if len(arr) == 2 {
  1036. spotcost = arr[0]
  1037. } else {
  1038. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  1039. }
  1040. return &Node{
  1041. Cost: spotcost,
  1042. VCPU: terms.VCpu,
  1043. RAM: terms.Memory,
  1044. GPU: terms.GPU,
  1045. Storage: terms.Storage,
  1046. BaseCPUPrice: aws.BaseCPUPrice,
  1047. BaseRAMPrice: aws.BaseRAMPrice,
  1048. BaseGPUPrice: aws.BaseGPUPrice,
  1049. UsageType: PreemptibleType,
  1050. }, nil
  1051. } else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
  1052. log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
  1053. return &Node{
  1054. VCPU: terms.VCpu,
  1055. VCPUCost: aws.BaseSpotCPUPrice,
  1056. RAM: terms.Memory,
  1057. GPU: terms.GPU,
  1058. Storage: terms.Storage,
  1059. BaseCPUPrice: aws.BaseCPUPrice,
  1060. BaseRAMPrice: aws.BaseRAMPrice,
  1061. BaseGPUPrice: aws.BaseGPUPrice,
  1062. UsageType: PreemptibleType,
  1063. }, nil
  1064. } else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
  1065. strCost := fmt.Sprintf("%f", sp.EffectiveCost)
  1066. return &Node{
  1067. Cost: strCost,
  1068. VCPU: terms.VCpu,
  1069. RAM: terms.Memory,
  1070. GPU: terms.GPU,
  1071. Storage: terms.Storage,
  1072. BaseCPUPrice: aws.BaseCPUPrice,
  1073. BaseRAMPrice: aws.BaseRAMPrice,
  1074. BaseGPUPrice: aws.BaseGPUPrice,
  1075. UsageType: usageType,
  1076. }, nil
  1077. } else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
  1078. strCost := fmt.Sprintf("%f", ri.EffectiveCost)
  1079. return &Node{
  1080. Cost: strCost,
  1081. VCPU: terms.VCpu,
  1082. RAM: terms.Memory,
  1083. GPU: terms.GPU,
  1084. Storage: terms.Storage,
  1085. BaseCPUPrice: aws.BaseCPUPrice,
  1086. BaseRAMPrice: aws.BaseRAMPrice,
  1087. BaseGPUPrice: aws.BaseGPUPrice,
  1088. UsageType: usageType,
  1089. }, nil
  1090. }
  1091. var cost string
  1092. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  1093. if ok {
  1094. cost = c.PricePerUnit.USD
  1095. } else {
  1096. // Check for Chinese pricing before throwing error
  1097. c, ok = terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCodeCn+HourlyRateCodeCn]
  1098. if ok {
  1099. cost = c.PricePerUnit.CNY
  1100. } else {
  1101. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  1102. }
  1103. }
  1104. return &Node{
  1105. Cost: cost,
  1106. VCPU: terms.VCpu,
  1107. RAM: terms.Memory,
  1108. GPU: terms.GPU,
  1109. Storage: terms.Storage,
  1110. BaseCPUPrice: aws.BaseCPUPrice,
  1111. BaseRAMPrice: aws.BaseRAMPrice,
  1112. BaseGPUPrice: aws.BaseGPUPrice,
  1113. UsageType: usageType,
  1114. }, nil
  1115. }
  1116. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  1117. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  1118. aws.DownloadPricingDataLock.RLock()
  1119. defer aws.DownloadPricingDataLock.RUnlock()
  1120. key := k.Features()
  1121. usageType := "ondemand"
  1122. if aws.isPreemptible(key) {
  1123. usageType = PreemptibleType
  1124. }
  1125. terms, ok := aws.Pricing[key]
  1126. if ok {
  1127. return aws.createNode(terms, usageType, k)
  1128. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  1129. aws.DownloadPricingDataLock.RUnlock()
  1130. err := aws.DownloadPricingData()
  1131. aws.DownloadPricingDataLock.RLock()
  1132. if err != nil {
  1133. return &Node{
  1134. Cost: aws.BaseCPUPrice,
  1135. BaseCPUPrice: aws.BaseCPUPrice,
  1136. BaseRAMPrice: aws.BaseRAMPrice,
  1137. BaseGPUPrice: aws.BaseGPUPrice,
  1138. UsageType: usageType,
  1139. UsesBaseCPUPrice: true,
  1140. }, err
  1141. }
  1142. terms, termsOk := aws.Pricing[key]
  1143. if !termsOk {
  1144. return &Node{
  1145. Cost: aws.BaseCPUPrice,
  1146. BaseCPUPrice: aws.BaseCPUPrice,
  1147. BaseRAMPrice: aws.BaseRAMPrice,
  1148. BaseGPUPrice: aws.BaseGPUPrice,
  1149. UsageType: usageType,
  1150. UsesBaseCPUPrice: true,
  1151. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  1152. }
  1153. return aws.createNode(terms, usageType, k)
  1154. } else { // Fall back to base pricing if we can't find the key. Base pricing is handled at the costmodel level.
  1155. return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
  1156. }
  1157. }
  1158. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  1159. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  1160. defaultClusterName := "AWS Cluster #1"
  1161. c, err := awsProvider.GetConfig()
  1162. if err != nil {
  1163. return nil, err
  1164. }
  1165. remoteEnabled := env.IsRemoteEnabled()
  1166. makeStructure := func(clusterName string) (map[string]string, error) {
  1167. m := make(map[string]string)
  1168. m["name"] = clusterName
  1169. m["provider"] = "AWS"
  1170. m["account"] = c.AthenaProjectID // this value requires configuration but is unavailable else where
  1171. m["region"] = awsProvider.clusterRegion
  1172. m["id"] = env.GetClusterID()
  1173. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  1174. m["provisioner"] = awsProvider.clusterProvisioner
  1175. return m, nil
  1176. }
  1177. if c.ClusterName != "" {
  1178. return makeStructure(c.ClusterName)
  1179. }
  1180. maybeClusterId := env.GetAWSClusterID()
  1181. if len(maybeClusterId) != 0 {
  1182. klog.V(2).Infof("Returning \"%s\" as ClusterName", maybeClusterId)
  1183. return makeStructure(maybeClusterId)
  1184. }
  1185. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", env.AWSClusterIDEnvVar)
  1186. return makeStructure(defaultClusterName)
  1187. }
  1188. // updates the authentication to the latest values (via config or secret)
  1189. func (aws *AWS) ConfigureAuth() error {
  1190. c, err := aws.Config.GetCustomPricingData()
  1191. if err != nil {
  1192. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  1193. }
  1194. return aws.ConfigureAuthWith(c)
  1195. }
  1196. // updates the authentication to the latest values (via config or secret)
  1197. func (aws *AWS) ConfigureAuthWith(config *CustomPricing) error {
  1198. accessKeyID, accessKeySecret := aws.getAWSAuth(false, config)
  1199. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1200. err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
  1201. if err != nil {
  1202. return err
  1203. }
  1204. err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
  1205. if err != nil {
  1206. return err
  1207. }
  1208. }
  1209. return nil
  1210. }
  1211. // Gets the aws key id and secret
  1212. func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
  1213. // 1. Check config values first (set from frontend UI)
  1214. if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
  1215. aws.serviceAccountChecks.set("hasKey", &ServiceAccountCheck{
  1216. Message: "AWS ServiceKey exists",
  1217. Status: true,
  1218. })
  1219. return cp.ServiceKeyName, cp.ServiceKeySecret
  1220. }
  1221. // 2. Check for secret
  1222. s, _ := aws.loadAWSAuthSecret(forceReload)
  1223. if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
  1224. aws.serviceAccountChecks.set("hasKey", &ServiceAccountCheck{
  1225. Message: "AWS ServiceKey exists",
  1226. Status: true,
  1227. })
  1228. return s.AccessKeyID, s.SecretAccessKey
  1229. }
  1230. // 3. Fall back to env vars
  1231. if env.GetAWSAccessKeyID() == "" || env.GetAWSAccessKeyID() == "" {
  1232. aws.serviceAccountChecks.set("hasKey", &ServiceAccountCheck{
  1233. Message: "AWS ServiceKey exists",
  1234. Status: false,
  1235. })
  1236. } else {
  1237. aws.serviceAccountChecks.set("hasKey", &ServiceAccountCheck{
  1238. Message: "AWS ServiceKey exists",
  1239. Status: true,
  1240. })
  1241. }
  1242. return env.GetAWSAccessKeyID(), env.GetAWSAccessKeySecret()
  1243. }
  1244. // Load once and cache the result (even on failure). This is an install time secret, so
  1245. // we don't expect the secret to change. If it does, however, we can force reload using
  1246. // the input parameter.
  1247. func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
  1248. if !force && loadedAWSSecret {
  1249. return awsSecret, nil
  1250. }
  1251. loadedAWSSecret = true
  1252. exists, err := fileutil.FileExists(authSecretPath)
  1253. if !exists || err != nil {
  1254. return nil, fmt.Errorf("Failed to locate service account file: %s", authSecretPath)
  1255. }
  1256. result, err := ioutil.ReadFile(authSecretPath)
  1257. if err != nil {
  1258. return nil, err
  1259. }
  1260. var ak AWSAccessKey
  1261. err = json.Unmarshal(result, &ak)
  1262. if err != nil {
  1263. return nil, err
  1264. }
  1265. awsSecret = &ak
  1266. return awsSecret, nil
  1267. }
  1268. func (aws *AWS) getAddressesForRegion(ctx context.Context, region string) (*ec2.DescribeAddressesOutput, error) {
  1269. aak, err := aws.GetAWSAccessKey()
  1270. if err != nil {
  1271. return nil, err
  1272. }
  1273. cfg, err := aak.CreateConfig(region)
  1274. if err != nil {
  1275. return nil, err
  1276. }
  1277. cli := ec2.NewFromConfig(cfg)
  1278. return cli.DescribeAddresses(ctx, &ec2.DescribeAddressesInput{})
  1279. }
  1280. // GetAddresses retrieves EC2 addresses
  1281. func (aws *AWS) GetAddresses() ([]byte, error) {
  1282. aws.ConfigureAuth() // load authentication data into env vars
  1283. addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
  1284. errorCh := make(chan error, len(awsRegions))
  1285. var wg sync.WaitGroup
  1286. wg.Add(len(awsRegions))
  1287. // Get volumes from each AWS region
  1288. for _, r := range awsRegions {
  1289. // Fetch IP address response and send results and errors to their
  1290. // respective channels
  1291. go func(region string) {
  1292. defer wg.Done()
  1293. defer errors.HandlePanic()
  1294. // Query for first page of volume results
  1295. resp, err := aws.getAddressesForRegion(context.TODO(), region)
  1296. if err != nil {
  1297. errorCh <- err
  1298. return
  1299. }
  1300. addressCh <- resp
  1301. }(r)
  1302. }
  1303. // Close the result channels after everything has been sent
  1304. go func() {
  1305. defer errors.HandlePanic()
  1306. wg.Wait()
  1307. close(errorCh)
  1308. close(addressCh)
  1309. }()
  1310. var addresses []*ec2Types.Address
  1311. for adds := range addressCh {
  1312. for _, add := range adds.Addresses {
  1313. a := add // duplicate to avoid pointer to iterator
  1314. addresses = append(addresses, &a)
  1315. }
  1316. }
  1317. var errs []error
  1318. for err := range errorCh {
  1319. log.DedupedWarningf(5, "unable to get addresses: %s", err)
  1320. errs = append(errs, err)
  1321. }
  1322. // Return error if no addresses are returned
  1323. if len(errs) > 0 && len(addresses) == 0 {
  1324. return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errs), errs)
  1325. }
  1326. // Format the response this way to match the JSON-encoded formatting of a single response
  1327. // from DescribeAddresss, so that consumers can always expect AWS disk responses to have
  1328. // a "Addresss" key at the top level.
  1329. return json.Marshal(map[string][]*ec2Types.Address{
  1330. "Addresses": addresses,
  1331. })
  1332. }
  1333. func (aws *AWS) getDisksForRegion(ctx context.Context, region string, maxResults int32, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
  1334. aak, err := aws.GetAWSAccessKey()
  1335. if err != nil {
  1336. return nil, err
  1337. }
  1338. cfg, err := aak.CreateConfig(region)
  1339. if err != nil {
  1340. return nil, err
  1341. }
  1342. cli := ec2.NewFromConfig(cfg)
  1343. return cli.DescribeVolumes(ctx, &ec2.DescribeVolumesInput{
  1344. MaxResults: &maxResults,
  1345. NextToken: nextToken,
  1346. })
  1347. }
  1348. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  1349. func (aws *AWS) GetDisks() ([]byte, error) {
  1350. aws.ConfigureAuth() // load authentication data into env vars
  1351. volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
  1352. errorCh := make(chan error, len(awsRegions))
  1353. var wg sync.WaitGroup
  1354. wg.Add(len(awsRegions))
  1355. // Get volumes from each AWS region
  1356. for _, r := range awsRegions {
  1357. // Fetch volume response and send results and errors to their
  1358. // respective channels
  1359. go func(region string) {
  1360. defer wg.Done()
  1361. defer errors.HandlePanic()
  1362. // Query for first page of volume results
  1363. resp, err := aws.getDisksForRegion(context.TODO(), region, 1000, nil)
  1364. if err != nil {
  1365. errorCh <- err
  1366. return
  1367. }
  1368. volumeCh <- resp
  1369. // A NextToken indicates more pages of results. Keep querying
  1370. // until all pages are retrieved.
  1371. for resp.NextToken != nil {
  1372. resp, err = aws.getDisksForRegion(context.TODO(), region, 100, resp.NextToken)
  1373. if err != nil {
  1374. errorCh <- err
  1375. return
  1376. }
  1377. volumeCh <- resp
  1378. }
  1379. }(r)
  1380. }
  1381. // Close the result channels after everything has been sent
  1382. go func() {
  1383. defer errors.HandlePanic()
  1384. wg.Wait()
  1385. close(errorCh)
  1386. close(volumeCh)
  1387. }()
  1388. var volumes []*ec2Types.Volume
  1389. for vols := range volumeCh {
  1390. for _, vol := range vols.Volumes {
  1391. v := vol // duplicate to avoid pointer to iterator
  1392. volumes = append(volumes, &v)
  1393. }
  1394. }
  1395. var errs []error
  1396. for err := range errorCh {
  1397. log.DedupedWarningf(5, "unable to get disks: %s", err)
  1398. errs = append(errs, err)
  1399. }
  1400. // Return error if no volumes are returned
  1401. if len(errs) > 0 && len(volumes) == 0 {
  1402. return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errs), errs)
  1403. }
  1404. // Format the response this way to match the JSON-encoded formatting of a single response
  1405. // from DescribeVolumes, so that consumers can always expect AWS disk responses to have
  1406. // a "Volumes" key at the top level.
  1407. return json.Marshal(map[string][]*ec2Types.Volume{
  1408. "Volumes": volumes,
  1409. })
  1410. }
  1411. // QueryAthenaPaginated executes athena query and processes results.
  1412. func (aws *AWS) QueryAthenaPaginated(ctx context.Context, query string, fn func(*athena.GetQueryResultsOutput) bool) error {
  1413. awsAthenaInfo, err := aws.GetAWSAthenaInfo()
  1414. if err != nil {
  1415. return err
  1416. }
  1417. if awsAthenaInfo.AthenaDatabase == "" || awsAthenaInfo.AthenaTable == "" || awsAthenaInfo.AthenaRegion == "" ||
  1418. awsAthenaInfo.AthenaBucketName == "" || awsAthenaInfo.AccountID == "" {
  1419. return fmt.Errorf("QueryAthenaPaginated: athena configuration incomplete")
  1420. }
  1421. queryExecutionCtx := &athenaTypes.QueryExecutionContext{
  1422. Database: awsSDK.String(awsAthenaInfo.AthenaDatabase),
  1423. }
  1424. resultConfiguration := &athenaTypes.ResultConfiguration{
  1425. OutputLocation: awsSDK.String(awsAthenaInfo.AthenaBucketName),
  1426. }
  1427. startQueryExecutionInput := &athena.StartQueryExecutionInput{
  1428. QueryString: awsSDK.String(query),
  1429. QueryExecutionContext: queryExecutionCtx,
  1430. ResultConfiguration: resultConfiguration,
  1431. }
  1432. // Create Athena Client
  1433. cfg, err := awsAthenaInfo.CreateConfig()
  1434. if err != nil {
  1435. log.Errorf("Could not retrieve Athena Configuration: %s", err.Error())
  1436. }
  1437. cli := athena.NewFromConfig(cfg)
  1438. // Query Athena
  1439. startQueryExecutionOutput, err := cli.StartQueryExecution(ctx, startQueryExecutionInput)
  1440. if err != nil {
  1441. return fmt.Errorf("QueryAthenaPaginated: start query error: %s", err.Error())
  1442. }
  1443. err = waitForQueryToComplete(ctx, cli, startQueryExecutionOutput.QueryExecutionId)
  1444. if err != nil {
  1445. return fmt.Errorf("QueryAthenaPaginated: query execution error: %s", err.Error())
  1446. }
  1447. queryResultsInput := &athena.GetQueryResultsInput{
  1448. QueryExecutionId: startQueryExecutionOutput.QueryExecutionId,
  1449. }
  1450. getQueryResultsPaginator := athena.NewGetQueryResultsPaginator(cli, queryResultsInput)
  1451. for getQueryResultsPaginator.HasMorePages() {
  1452. pg, err := getQueryResultsPaginator.NextPage(ctx)
  1453. if err != nil {
  1454. log.Errorf("QueryAthenaPaginated: NextPage error: %s", err.Error())
  1455. continue
  1456. }
  1457. fn(pg)
  1458. }
  1459. return nil
  1460. }
  1461. func waitForQueryToComplete(ctx context.Context, client *athena.Client, queryExecutionID *string) error {
  1462. inp := &athena.GetQueryExecutionInput{
  1463. QueryExecutionId: queryExecutionID,
  1464. }
  1465. isQueryStillRunning := true
  1466. for isQueryStillRunning {
  1467. qe, err := client.GetQueryExecution(ctx, inp)
  1468. if err != nil {
  1469. return err
  1470. }
  1471. if qe.QueryExecution.Status.State == "SUCCEEDED" {
  1472. isQueryStillRunning = false
  1473. continue
  1474. }
  1475. if qe.QueryExecution.Status.State != "RUNNING" && qe.QueryExecution.Status.State != "QUEUED" {
  1476. return fmt.Errorf("no query results available for query %s", *queryExecutionID)
  1477. }
  1478. time.Sleep(2 * time.Second)
  1479. }
  1480. return nil
  1481. }
  1482. type SavingsPlanData struct {
  1483. ResourceID string
  1484. EffectiveCost float64
  1485. SavingsPlanARN string
  1486. MostRecentDate string
  1487. }
  1488. func (aws *AWS) GetSavingsPlanDataFromAthena() error {
  1489. cfg, err := aws.GetConfig()
  1490. if err != nil {
  1491. aws.RIPricingError = err
  1492. return err
  1493. }
  1494. if cfg.AthenaBucketName == "" {
  1495. err = fmt.Errorf("No Athena Bucket configured")
  1496. aws.RIPricingError = err
  1497. return err
  1498. }
  1499. if aws.SavingsPlanDataByInstanceID == nil {
  1500. aws.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData)
  1501. }
  1502. tNow := time.Now()
  1503. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1504. start := tOneDayAgo.Format("2006-01-02")
  1505. end := tNow.Format("2006-01-02")
  1506. // Use Savings Plan Effective Rate as an estimation for cost, assuming the 1h most recent period got a fully loaded savings plan.
  1507. //
  1508. q := `SELECT
  1509. line_item_usage_start_date,
  1510. savings_plan_savings_plan_a_r_n,
  1511. line_item_resource_id,
  1512. savings_plan_savings_plan_rate
  1513. FROM %s as cost_data
  1514. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1515. AND line_item_line_item_type = 'SavingsPlanCoveredUsage' ORDER BY
  1516. line_item_usage_start_date DESC`
  1517. page := 0
  1518. processResults := func(op *athena.GetQueryResultsOutput) bool {
  1519. if op == nil {
  1520. log.Errorf("GetSavingsPlanDataFromAthena: Athena page is nil")
  1521. return false
  1522. } else if op.ResultSet == nil {
  1523. log.Errorf("GetSavingsPlanDataFromAthena: Athena page.ResultSet is nil")
  1524. return false
  1525. }
  1526. aws.SavingsPlanDataLock.Lock()
  1527. aws.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData) // Clean out the old data and only report a savingsplan price if its in the most recent run.
  1528. mostRecentDate := ""
  1529. iter := op.ResultSet.Rows
  1530. if page == 0 && len(iter) > 0 {
  1531. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1532. }
  1533. page++
  1534. for _, r := range iter {
  1535. d := *r.Data[0].VarCharValue
  1536. if mostRecentDate == "" {
  1537. mostRecentDate = d
  1538. } else if mostRecentDate != d { // Get all most recent assignments
  1539. break
  1540. }
  1541. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1542. if err != nil {
  1543. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1544. }
  1545. r := &SavingsPlanData{
  1546. ResourceID: *r.Data[2].VarCharValue,
  1547. EffectiveCost: cost,
  1548. SavingsPlanARN: *r.Data[1].VarCharValue,
  1549. MostRecentDate: d,
  1550. }
  1551. aws.SavingsPlanDataByInstanceID[r.ResourceID] = r
  1552. }
  1553. klog.V(1).Infof("Found %d savings plan applied instances", len(aws.SavingsPlanDataByInstanceID))
  1554. for k, r := range aws.SavingsPlanDataByInstanceID {
  1555. log.DedupedInfof(5, "Savings Plan Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1556. }
  1557. aws.SavingsPlanDataLock.Unlock()
  1558. return true
  1559. }
  1560. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1561. klog.V(3).Infof("Running Query: %s", query)
  1562. err = aws.QueryAthenaPaginated(context.TODO(), query, processResults)
  1563. if err != nil {
  1564. aws.RIPricingError = err
  1565. return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
  1566. }
  1567. return nil
  1568. }
  1569. type RIData struct {
  1570. ResourceID string
  1571. EffectiveCost float64
  1572. ReservationARN string
  1573. MostRecentDate string
  1574. }
  1575. func (aws *AWS) GetReservationDataFromAthena() error {
  1576. cfg, err := aws.GetConfig()
  1577. if err != nil {
  1578. aws.RIPricingError = err
  1579. return err
  1580. }
  1581. if cfg.AthenaBucketName == "" {
  1582. err = fmt.Errorf("No Athena Bucket configured")
  1583. aws.RIPricingError = err
  1584. return err
  1585. }
  1586. // Query for all column names in advance in order to validate configured
  1587. // label columns
  1588. columns, _ := aws.fetchColumns()
  1589. if !columns["reservation_reservation_a_r_n"] || !columns["reservation_effective_cost"] {
  1590. err = fmt.Errorf("no reservation data available in Athena")
  1591. aws.RIPricingError = err
  1592. return err
  1593. }
  1594. if aws.RIPricingByInstanceID == nil {
  1595. aws.RIPricingByInstanceID = make(map[string]*RIData)
  1596. }
  1597. tNow := time.Now()
  1598. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1599. start := tOneDayAgo.Format("2006-01-02")
  1600. end := tNow.Format("2006-01-02")
  1601. q := `SELECT
  1602. line_item_usage_start_date,
  1603. reservation_reservation_a_r_n,
  1604. line_item_resource_id,
  1605. reservation_effective_cost
  1606. FROM %s as cost_data
  1607. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1608. AND reservation_reservation_a_r_n <> '' ORDER BY
  1609. line_item_usage_start_date DESC`
  1610. page := 0
  1611. processResults := func(op *athena.GetQueryResultsOutput) bool {
  1612. if op == nil {
  1613. log.Errorf("GetReservationDataFromAthena: Athena page is nil")
  1614. return false
  1615. } else if op.ResultSet == nil {
  1616. log.Errorf("GetReservationDataFromAthena: Athena page.ResultSet is nil")
  1617. return false
  1618. }
  1619. aws.RIDataLock.Lock()
  1620. aws.RIPricingByInstanceID = make(map[string]*RIData) // Clean out the old data and only report a RI price if its in the most recent run.
  1621. mostRecentDate := ""
  1622. iter := op.ResultSet.Rows
  1623. if page == 0 && len(iter) > 0 {
  1624. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1625. }
  1626. page++
  1627. for _, r := range iter {
  1628. d := *r.Data[0].VarCharValue
  1629. if mostRecentDate == "" {
  1630. mostRecentDate = d
  1631. } else if mostRecentDate != d { // Get all most recent assignments
  1632. break
  1633. }
  1634. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1635. if err != nil {
  1636. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1637. }
  1638. r := &RIData{
  1639. ResourceID: *r.Data[2].VarCharValue,
  1640. EffectiveCost: cost,
  1641. ReservationARN: *r.Data[1].VarCharValue,
  1642. MostRecentDate: d,
  1643. }
  1644. aws.RIPricingByInstanceID[r.ResourceID] = r
  1645. }
  1646. klog.V(1).Infof("Found %d reserved instances", len(aws.RIPricingByInstanceID))
  1647. for k, r := range aws.RIPricingByInstanceID {
  1648. log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1649. }
  1650. aws.RIDataLock.Unlock()
  1651. return true
  1652. }
  1653. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1654. klog.V(3).Infof("Running Query: %s", query)
  1655. err = aws.QueryAthenaPaginated(context.TODO(), query, processResults)
  1656. if err != nil {
  1657. aws.RIPricingError = err
  1658. return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
  1659. }
  1660. aws.RIPricingError = nil
  1661. return nil
  1662. }
  1663. // fetchColumns returns a list of the names of all columns in the configured
  1664. // Athena tables
  1665. func (aws *AWS) fetchColumns() (map[string]bool, error) {
  1666. columnSet := map[string]bool{}
  1667. awsAthenaInfo, err := aws.GetAWSAthenaInfo()
  1668. if err != nil {
  1669. return nil, err
  1670. }
  1671. // This Query is supported by Athena tables and views
  1672. q := `SELECT column_name FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s'`
  1673. query := fmt.Sprintf(q, awsAthenaInfo.AthenaDatabase, awsAthenaInfo.AthenaTable)
  1674. pageNum := 0
  1675. athenaErr := aws.QueryAthenaPaginated(context.TODO(), query, func(page *athena.GetQueryResultsOutput) bool {
  1676. if page == nil {
  1677. log.Errorf("fetchColumns: Athena page is nil")
  1678. return false
  1679. } else if page.ResultSet == nil {
  1680. log.Errorf("fetchColumns: Athena page.ResultSet is nil")
  1681. return false
  1682. }
  1683. // remove header row 'column_name'
  1684. rows := page.ResultSet.Rows[1:]
  1685. for _, row := range rows {
  1686. columnSet[*row.Data[0].VarCharValue] = true
  1687. }
  1688. pageNum++
  1689. return true
  1690. })
  1691. if athenaErr != nil {
  1692. return columnSet, athenaErr
  1693. }
  1694. if len(columnSet) == 0 {
  1695. log.Infof("No columns retrieved from Athena")
  1696. }
  1697. return columnSet, nil
  1698. }
  1699. type spotInfo struct {
  1700. Timestamp string `csv:"Timestamp"`
  1701. UsageType string `csv:"UsageType"`
  1702. Operation string `csv:"Operation"`
  1703. InstanceID string `csv:"InstanceID"`
  1704. MyBidID string `csv:"MyBidID"`
  1705. MyMaxPrice string `csv:"MyMaxPrice"`
  1706. MarketPrice string `csv:"MarketPrice"`
  1707. Charge string `csv:"Charge"`
  1708. Version string `csv:"Version"`
  1709. }
  1710. func (aws *AWS) parseSpotData(bucket string, prefix string, projectID string, region string) (map[string]*spotInfo, error) {
  1711. aws.ConfigureAuth() // configure aws api authentication by setting env vars
  1712. s3Prefix := projectID
  1713. if len(prefix) != 0 {
  1714. s3Prefix = prefix + "/" + s3Prefix
  1715. }
  1716. aak, err := aws.GetAWSAccessKey()
  1717. if err != nil {
  1718. return nil, err
  1719. }
  1720. cfg, err := aak.CreateConfig(region)
  1721. if err != nil {
  1722. return nil, err
  1723. }
  1724. cli := s3.NewFromConfig(cfg)
  1725. downloader := manager.NewDownloader(cli)
  1726. tNow := time.Now()
  1727. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1728. ls := &s3.ListObjectsInput{
  1729. Bucket: awsSDK.String(bucket),
  1730. Prefix: awsSDK.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1731. }
  1732. ls2 := &s3.ListObjectsInput{
  1733. Bucket: awsSDK.String(bucket),
  1734. Prefix: awsSDK.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1735. }
  1736. lso, err := cli.ListObjects(context.TODO(), ls)
  1737. if err != nil {
  1738. aws.serviceAccountChecks.set("bucketList", &ServiceAccountCheck{
  1739. Message: "Bucket List Permissions Available",
  1740. Status: false,
  1741. AdditionalInfo: err.Error(),
  1742. })
  1743. return nil, err
  1744. } else {
  1745. aws.serviceAccountChecks.set("bucketList", &ServiceAccountCheck{
  1746. Message: "Bucket List Permissions Available",
  1747. Status: true,
  1748. })
  1749. }
  1750. lsoLen := len(lso.Contents)
  1751. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1752. if lsoLen == 0 {
  1753. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1754. }
  1755. lso2, err := cli.ListObjects(context.TODO(), ls2)
  1756. if err != nil {
  1757. return nil, err
  1758. }
  1759. lso2Len := len(lso2.Contents)
  1760. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1761. if lso2Len == 0 {
  1762. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1763. }
  1764. // TODO: Worth it to use LastModifiedDate to determine if we should reparse the spot data?
  1765. var keys []*string
  1766. for _, obj := range lso.Contents {
  1767. keys = append(keys, obj.Key)
  1768. }
  1769. for _, obj := range lso2.Contents {
  1770. keys = append(keys, obj.Key)
  1771. }
  1772. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1773. header, err := csvutil.Header(spotInfo{}, "csv")
  1774. if err != nil {
  1775. return nil, err
  1776. }
  1777. fieldsPerRecord := len(header)
  1778. spots := make(map[string]*spotInfo)
  1779. for _, key := range keys {
  1780. getObj := &s3.GetObjectInput{
  1781. Bucket: awsSDK.String(bucket),
  1782. Key: key,
  1783. }
  1784. buf := manager.NewWriteAtBuffer([]byte{})
  1785. _, err := downloader.Download(context.TODO(), buf, getObj)
  1786. if err != nil {
  1787. aws.serviceAccountChecks.set("objectList", &ServiceAccountCheck{
  1788. Message: "Object Get Permissions Available",
  1789. Status: false,
  1790. AdditionalInfo: err.Error(),
  1791. })
  1792. return nil, err
  1793. } else {
  1794. aws.serviceAccountChecks.set("objectList", &ServiceAccountCheck{
  1795. Message: "Object Get Permissions Available",
  1796. Status: true,
  1797. })
  1798. }
  1799. r := bytes.NewReader(buf.Bytes())
  1800. gr, err := gzip.NewReader(r)
  1801. if err != nil {
  1802. return nil, err
  1803. }
  1804. csvReader := csv.NewReader(gr)
  1805. csvReader.Comma = '\t'
  1806. csvReader.FieldsPerRecord = fieldsPerRecord
  1807. dec, err := csvutil.NewDecoder(csvReader, header...)
  1808. if err != nil {
  1809. return nil, err
  1810. }
  1811. var foundVersion string
  1812. for {
  1813. spot := spotInfo{}
  1814. err := dec.Decode(&spot)
  1815. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1816. if err == io.EOF {
  1817. break
  1818. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1819. rec := dec.Record()
  1820. // the first two "Record()" will be the comment lines
  1821. // and they show up as len() == 1
  1822. // the first of which is "#Version"
  1823. // the second of which is "#Fields: "
  1824. if len(rec) != 1 {
  1825. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1826. continue
  1827. }
  1828. if len(foundVersion) == 0 {
  1829. spotFeedVersion := rec[0]
  1830. klog.V(4).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  1831. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1832. if matches != nil {
  1833. foundVersion = matches[1]
  1834. if foundVersion != supportedSpotFeedVersion {
  1835. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1836. break
  1837. }
  1838. }
  1839. continue
  1840. } else if strings.Index(rec[0], "#") == 0 {
  1841. continue
  1842. } else {
  1843. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  1844. continue
  1845. }
  1846. } else if err != nil {
  1847. klog.V(2).Infof("Error during spot info decode: %+v", err)
  1848. continue
  1849. }
  1850. log.DedupedInfof(5, "Found spot info for: %s", spot.InstanceID)
  1851. spots[spot.InstanceID] = &spot
  1852. }
  1853. gr.Close()
  1854. }
  1855. return spots, nil
  1856. }
  1857. // ApplyReservedInstancePricing TODO
  1858. func (aws *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  1859. }
  1860. func (aws *AWS) ServiceAccountStatus() *ServiceAccountStatus {
  1861. return aws.serviceAccountChecks.getStatus()
  1862. }
  1863. func (aws *AWS) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  1864. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  1865. }
  1866. // Regions returns a predefined list of AWS regions
  1867. func (aws *AWS) Regions() []string {
  1868. return awsRegions
  1869. }