awsprovider.go 64 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "encoding/csv"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "net/http"
  11. "os"
  12. "regexp"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "github.com/opencost/opencost/pkg/kubecost"
  18. "github.com/opencost/opencost/pkg/clustercache"
  19. "github.com/opencost/opencost/pkg/env"
  20. errs "github.com/opencost/opencost/pkg/errors"
  21. "github.com/opencost/opencost/pkg/log"
  22. "github.com/opencost/opencost/pkg/util"
  23. "github.com/opencost/opencost/pkg/util/fileutil"
  24. "github.com/opencost/opencost/pkg/util/json"
  25. awsSDK "github.com/aws/aws-sdk-go-v2/aws"
  26. "github.com/aws/aws-sdk-go-v2/config"
  27. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  28. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  29. "github.com/aws/aws-sdk-go-v2/service/athena"
  30. athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types"
  31. "github.com/aws/aws-sdk-go-v2/service/ec2"
  32. ec2Types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
  33. "github.com/aws/aws-sdk-go-v2/service/s3"
  34. "github.com/aws/aws-sdk-go-v2/service/sts"
  35. "github.com/jszwec/csvutil"
  36. v1 "k8s.io/api/core/v1"
  37. )
  38. const (
  39. supportedSpotFeedVersion = "1"
  40. SpotInfoUpdateType = "spotinfo"
  41. AthenaInfoUpdateType = "athenainfo"
  42. PreemptibleType = "preemptible"
  43. APIPricingSource = "Public API"
  44. SpotPricingSource = "Spot Data Feed"
  45. ReservedInstancePricingSource = "Savings Plan, Reserved Instance, and Out-Of-Cluster"
  46. )
  47. var (
  48. // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  49. provIdRx = regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  50. usageTypeRegx = regexp.MustCompile(".*(-|^)(EBS.+)")
  51. versionRx = regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  52. )
  53. func (aws *AWS) PricingSourceStatus() map[string]*PricingSource {
  54. sources := make(map[string]*PricingSource)
  55. sps := &PricingSource{
  56. Name: SpotPricingSource,
  57. Enabled: true,
  58. }
  59. if !aws.SpotRefreshEnabled() {
  60. sps.Available = false
  61. sps.Error = "Spot instances not set up"
  62. sps.Enabled = false
  63. } else {
  64. sps.Error = ""
  65. if aws.SpotPricingError != nil {
  66. sps.Error = aws.SpotPricingError.Error()
  67. }
  68. if sps.Error != "" {
  69. sps.Available = false
  70. } else if len(aws.SpotPricingByInstanceID) > 0 {
  71. sps.Available = true
  72. } else {
  73. sps.Error = "No spot instances detected"
  74. }
  75. }
  76. sources[SpotPricingSource] = sps
  77. rps := &PricingSource{
  78. Name: ReservedInstancePricingSource,
  79. Enabled: true,
  80. }
  81. rps.Error = ""
  82. if aws.RIPricingError != nil {
  83. rps.Error = aws.RIPricingError.Error()
  84. }
  85. if rps.Error != "" {
  86. rps.Available = false
  87. } else {
  88. rps.Available = true
  89. }
  90. sources[ReservedInstancePricingSource] = rps
  91. return sources
  92. }
  93. // How often spot data is refreshed
  94. const SpotRefreshDuration = 15 * time.Minute
  95. var awsRegions = []string{
  96. "us-east-2",
  97. "us-east-1",
  98. "us-west-1",
  99. "us-west-2",
  100. "ap-east-1",
  101. "ap-south-1",
  102. "ap-northeast-3",
  103. "ap-northeast-2",
  104. "ap-southeast-1",
  105. "ap-southeast-2",
  106. "ap-northeast-1",
  107. "ap-southeast-3",
  108. "ca-central-1",
  109. "cn-north-1",
  110. "cn-northwest-1",
  111. "eu-central-1",
  112. "eu-west-1",
  113. "eu-west-2",
  114. "eu-west-3",
  115. "eu-north-1",
  116. "eu-south-1",
  117. "me-south-1",
  118. "sa-east-1",
  119. "af-south-1",
  120. "us-gov-east-1",
  121. "us-gov-west-1",
  122. }
  123. // AWS represents an Amazon Provider
  124. type AWS struct {
  125. Pricing map[string]*AWSProductTerms
  126. SpotPricingByInstanceID map[string]*spotInfo
  127. SpotPricingUpdatedAt *time.Time
  128. SpotRefreshRunning bool
  129. SpotPricingLock sync.RWMutex
  130. SpotPricingError error
  131. RIPricingByInstanceID map[string]*RIData
  132. RIPricingError error
  133. RIDataRunning bool
  134. RIDataLock sync.RWMutex
  135. SavingsPlanDataByInstanceID map[string]*SavingsPlanData
  136. SavingsPlanDataRunning bool
  137. SavingsPlanDataLock sync.RWMutex
  138. ValidPricingKeys map[string]bool
  139. Clientset clustercache.ClusterCache
  140. BaseCPUPrice string
  141. BaseRAMPrice string
  142. BaseGPUPrice string
  143. BaseSpotCPUPrice string
  144. BaseSpotRAMPrice string
  145. BaseSpotGPUPrice string
  146. SpotLabelName string
  147. SpotLabelValue string
  148. SpotDataRegion string
  149. SpotDataBucket string
  150. SpotDataPrefix string
  151. ProjectID string
  152. DownloadPricingDataLock sync.RWMutex
  153. Config *ProviderConfig
  154. serviceAccountChecks *ServiceAccountChecks
  155. clusterManagementPrice float64
  156. clusterAccountId string
  157. clusterRegion string
  158. clusterProvisioner string
  159. *CustomProvider
  160. }
  161. // AWSAccessKey holds AWS credentials and fulfils the awsV2.CredentialsProvider interface
  162. type AWSAccessKey struct {
  163. AccessKeyID string `json:"aws_access_key_id"`
  164. SecretAccessKey string `json:"aws_secret_access_key"`
  165. }
  166. // Retrieve returns a set of awsV2 credentials using the AWSAccessKey's key and secret.
  167. // This fulfils the awsV2.CredentialsProvider interface contract.
  168. func (accessKey AWSAccessKey) Retrieve(ctx context.Context) (awsSDK.Credentials, error) {
  169. return awsSDK.Credentials{
  170. AccessKeyID: accessKey.AccessKeyID,
  171. SecretAccessKey: accessKey.SecretAccessKey,
  172. }, nil
  173. }
  174. // CreateConfig creates an AWS SDK V2 Config for the credentials that it contains for the provided region
  175. func (accessKey AWSAccessKey) CreateConfig(region string) (awsSDK.Config, error) {
  176. var cfg awsSDK.Config
  177. var err error
  178. // If accessKey values have not been provided, attempt to load cfg from service key annotations
  179. if accessKey.AccessKeyID == "" && accessKey.SecretAccessKey == "" {
  180. cfg, err = config.LoadDefaultConfig(context.TODO(), config.WithRegion(region))
  181. if err != nil {
  182. return cfg, fmt.Errorf("failed to initialize AWS SDK config for region from annotation %s: %s", region, err)
  183. }
  184. } else {
  185. // The AWS SDK v2 requires an object fulfilling the CredentialsProvider interface, which cloud.AWSAccessKey does
  186. cfg, err = config.LoadDefaultConfig(context.TODO(), config.WithCredentialsProvider(accessKey), config.WithRegion(region))
  187. if err != nil {
  188. return cfg, fmt.Errorf("failed to initialize AWS SDK config for region %s: %s", region, err)
  189. }
  190. }
  191. return cfg, nil
  192. }
  193. // AWSPricing maps a k8s node to an AWS Pricing "product"
  194. type AWSPricing struct {
  195. Products map[string]*AWSProduct `json:"products"`
  196. Terms AWSPricingTerms `json:"terms"`
  197. }
  198. // AWSProduct represents a purchased SKU
  199. type AWSProduct struct {
  200. Sku string `json:"sku"`
  201. Attributes AWSProductAttributes `json:"attributes"`
  202. }
  203. // AWSProductAttributes represents metadata about the product used to map to a node.
  204. type AWSProductAttributes struct {
  205. Location string `json:"location"`
  206. InstanceType string `json:"instanceType"`
  207. Memory string `json:"memory"`
  208. Storage string `json:"storage"`
  209. VCpu string `json:"vcpu"`
  210. UsageType string `json:"usagetype"`
  211. OperatingSystem string `json:"operatingSystem"`
  212. PreInstalledSw string `json:"preInstalledSw"`
  213. InstanceFamily string `json:"instanceFamily"`
  214. CapacityStatus string `json:"capacitystatus"`
  215. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  216. }
  217. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  218. type AWSPricingTerms struct {
  219. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  220. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  221. }
  222. // AWSOfferTerm is a sku extension used to pay for the node.
  223. type AWSOfferTerm struct {
  224. Sku string `json:"sku"`
  225. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  226. }
  227. func (ot *AWSOfferTerm) String() string {
  228. var strs []string
  229. for k, rc := range ot.PriceDimensions {
  230. strs = append(strs, fmt.Sprintf("%s:%s", k, rc.String()))
  231. }
  232. return fmt.Sprintf("%s:%s", ot.Sku, strings.Join(strs, ","))
  233. }
  234. // AWSRateCode encodes data about the price of a product
  235. type AWSRateCode struct {
  236. Unit string `json:"unit"`
  237. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  238. }
  239. func (rc *AWSRateCode) String() string {
  240. return fmt.Sprintf("{unit: %s, pricePerUnit: %v", rc.Unit, rc.PricePerUnit)
  241. }
  242. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  243. type AWSCurrencyCode struct {
  244. USD string `json:"USD,omitempty"`
  245. CNY string `json:"CNY,omitempty"`
  246. }
  247. // AWSProductTerms represents the full terms of the product
  248. type AWSProductTerms struct {
  249. Sku string `json:"sku"`
  250. OnDemand *AWSOfferTerm `json:"OnDemand"`
  251. Reserved *AWSOfferTerm `json:"Reserved"`
  252. Memory string `json:"memory"`
  253. Storage string `json:"storage"`
  254. VCpu string `json:"vcpu"`
  255. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  256. PV *PV `json:"pv"`
  257. }
  258. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  259. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  260. // OnDemandRateCode is appended to an node sku
  261. const OnDemandRateCode = ".JRTCKXETXF"
  262. const OnDemandRateCodeCn = ".99YE2YK9UR"
  263. // ReservedRateCode is appended to a node sku
  264. const ReservedRateCode = ".38NPMPTW36"
  265. // HourlyRateCode is appended to a node sku
  266. const HourlyRateCode = ".6YS6EN2CT7"
  267. const HourlyRateCodeCn = ".Q7UJUT2CE6"
  268. // volTypes are used to map between AWS UsageTypes and
  269. // EBS volume types, as they would appear in K8s storage class
  270. // name and the EC2 API.
  271. var volTypes = map[string]string{
  272. "EBS:VolumeUsage.gp2": "gp2",
  273. "EBS:VolumeUsage.gp3": "gp3",
  274. "EBS:VolumeUsage": "standard",
  275. "EBS:VolumeUsage.sc1": "sc1",
  276. "EBS:VolumeP-IOPS.piops": "io1",
  277. "EBS:VolumeUsage.st1": "st1",
  278. "EBS:VolumeUsage.piops": "io1",
  279. "gp2": "EBS:VolumeUsage.gp2",
  280. "gp3": "EBS:VolumeUsage.gp3",
  281. "standard": "EBS:VolumeUsage",
  282. "sc1": "EBS:VolumeUsage.sc1",
  283. "io1": "EBS:VolumeUsage.piops",
  284. "st1": "EBS:VolumeUsage.st1",
  285. }
  286. // locationToRegion maps AWS region names (As they come from Billing)
  287. // to actual region identifiers
  288. var locationToRegion = map[string]string{
  289. "US East (Ohio)": "us-east-2",
  290. "US East (N. Virginia)": "us-east-1",
  291. "US West (N. California)": "us-west-1",
  292. "US West (Oregon)": "us-west-2",
  293. "Asia Pacific (Hong Kong)": "ap-east-1",
  294. "Asia Pacific (Mumbai)": "ap-south-1",
  295. "Asia Pacific (Osaka)": "ap-northeast-3",
  296. "Asia Pacific (Seoul)": "ap-northeast-2",
  297. "Asia Pacific (Singapore)": "ap-southeast-1",
  298. "Asia Pacific (Sydney)": "ap-southeast-2",
  299. "Asia Pacific (Tokyo)": "ap-northeast-1",
  300. "Asia Pacific (Jakarta)": "ap-southeast-3",
  301. "Canada (Central)": "ca-central-1",
  302. "China (Beijing)": "cn-north-1",
  303. "China (Ningxia)": "cn-northwest-1",
  304. "EU (Frankfurt)": "eu-central-1",
  305. "EU (Ireland)": "eu-west-1",
  306. "EU (London)": "eu-west-2",
  307. "EU (Paris)": "eu-west-3",
  308. "EU (Stockholm)": "eu-north-1",
  309. "EU (Milan)": "eu-south-1",
  310. "South America (Sao Paulo)": "sa-east-1",
  311. "Africa (Cape Town)": "af-south-1",
  312. "AWS GovCloud (US-East)": "us-gov-east-1",
  313. "AWS GovCloud (US-West)": "us-gov-west-1",
  314. }
  315. var regionToBillingRegionCode = map[string]string{
  316. "us-east-2": "USE2",
  317. "us-east-1": "",
  318. "us-west-1": "USW1",
  319. "us-west-2": "USW2",
  320. "ap-east-1": "APE1",
  321. "ap-south-1": "APS3",
  322. "ap-northeast-3": "APN3",
  323. "ap-northeast-2": "APN2",
  324. "ap-southeast-1": "APS1",
  325. "ap-southeast-2": "APS2",
  326. "ap-northeast-1": "APN1",
  327. "ap-southeast-3": "APS4",
  328. "ca-central-1": "CAN1",
  329. "cn-north-1": "",
  330. "cn-northwest-1": "",
  331. "eu-central-1": "EUC1",
  332. "eu-west-1": "EU",
  333. "eu-west-2": "EUW2",
  334. "eu-west-3": "EUW3",
  335. "eu-north-1": "EUN1",
  336. "eu-south-1": "EUS1",
  337. "sa-east-1": "SAE1",
  338. "af-south-1": "AFS1",
  339. "us-gov-east-1": "UGE1",
  340. "us-gov-west-1": "UGW1",
  341. }
  342. var loadedAWSSecret bool = false
  343. var awsSecret *AWSAccessKey = nil
  344. func (aws *AWS) GetLocalStorageQuery(window, offset time.Duration, rate bool, used bool) string {
  345. return ""
  346. }
  347. // KubeAttrConversion maps the k8s labels for region to an aws region
  348. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  349. operatingSystem = strings.ToLower(operatingSystem)
  350. region := locationToRegion[location]
  351. return region + "," + instanceType + "," + operatingSystem
  352. }
  353. // AwsSpotFeedInfo contains configuration for spot feed integration
  354. type AwsSpotFeedInfo struct {
  355. BucketName string `json:"bucketName"`
  356. Prefix string `json:"prefix"`
  357. Region string `json:"region"`
  358. AccountID string `json:"projectID"`
  359. ServiceKeyName string `json:"serviceKeyName"`
  360. ServiceKeySecret string `json:"serviceKeySecret"`
  361. SpotLabel string `json:"spotLabel"`
  362. SpotLabelValue string `json:"spotLabelValue"`
  363. }
  364. // AwsAthenaInfo contains configuration for CUR integration
  365. type AwsAthenaInfo struct {
  366. AthenaBucketName string `json:"athenaBucketName"`
  367. AthenaRegion string `json:"athenaRegion"`
  368. AthenaDatabase string `json:"athenaDatabase"`
  369. AthenaTable string `json:"athenaTable"`
  370. AthenaWorkgroup string `json:"athenaWorkgroup"`
  371. ServiceKeyName string `json:"serviceKeyName"`
  372. ServiceKeySecret string `json:"serviceKeySecret"`
  373. AccountID string `json:"projectID"`
  374. MasterPayerARN string `json:"masterPayerARN"`
  375. }
  376. // IsEmpty returns true if all fields in config are empty, false if not.
  377. func (aai *AwsAthenaInfo) IsEmpty() bool {
  378. return aai.AthenaBucketName == "" &&
  379. aai.AthenaRegion == "" &&
  380. aai.AthenaDatabase == "" &&
  381. aai.AthenaTable == "" &&
  382. aai.AthenaWorkgroup == "" &&
  383. aai.ServiceKeyName == "" &&
  384. aai.ServiceKeySecret == "" &&
  385. aai.AccountID == "" &&
  386. aai.MasterPayerARN == ""
  387. }
  388. // CreateConfig creates an AWS SDK V2 Config for the credentials that it contains
  389. func (aai *AwsAthenaInfo) CreateConfig() (awsSDK.Config, error) {
  390. keyProvider := AWSAccessKey{AccessKeyID: aai.ServiceKeyName, SecretAccessKey: aai.ServiceKeySecret}
  391. cfg, err := keyProvider.CreateConfig(aai.AthenaRegion)
  392. if err != nil {
  393. return cfg, err
  394. }
  395. if aai.MasterPayerARN != "" {
  396. // Create the credentials from AssumeRoleProvider to assume the role
  397. // referenced by the roleARN.
  398. stsSvc := sts.NewFromConfig(cfg)
  399. creds := stscreds.NewAssumeRoleProvider(stsSvc, aai.MasterPayerARN)
  400. cfg.Credentials = awsSDK.NewCredentialsCache(creds)
  401. }
  402. return cfg, nil
  403. }
  404. func (aws *AWS) GetManagementPlatform() (string, error) {
  405. nodes := aws.Clientset.GetAllNodes()
  406. if len(nodes) > 0 {
  407. n := nodes[0]
  408. version := n.Status.NodeInfo.KubeletVersion
  409. if strings.Contains(version, "eks") {
  410. return "eks", nil
  411. }
  412. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  413. return "kops", nil
  414. }
  415. }
  416. return "", nil
  417. }
  418. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  419. c, err := aws.Config.GetCustomPricingData()
  420. if err != nil {
  421. return nil, err
  422. }
  423. if c.Discount == "" {
  424. c.Discount = "0%"
  425. }
  426. if c.NegotiatedDiscount == "" {
  427. c.NegotiatedDiscount = "0%"
  428. }
  429. if c.ShareTenancyCosts == "" {
  430. c.ShareTenancyCosts = defaultShareTenancyCost
  431. }
  432. return c, nil
  433. }
  434. // GetAWSAccessKey generate an AWSAccessKey object from the config
  435. func (aws *AWS) GetAWSAccessKey() (*AWSAccessKey, error) {
  436. config, err := aws.GetConfig()
  437. if err != nil {
  438. return nil, fmt.Errorf("could not retrieve AwsAthenaInfo %s", err)
  439. }
  440. err = aws.ConfigureAuthWith(config)
  441. if err != nil {
  442. return nil, fmt.Errorf("error configuring Cloud Provider %s", err)
  443. }
  444. //Look for service key values in env if not present in config
  445. if config.ServiceKeyName == "" {
  446. config.ServiceKeyName = env.GetAWSAccessKeyID()
  447. }
  448. if config.ServiceKeySecret == "" {
  449. config.ServiceKeySecret = env.GetAWSAccessKeySecret()
  450. }
  451. if config.ServiceKeyName == "" && config.ServiceKeySecret == "" {
  452. log.DedupedInfof(1, "missing service key values for AWS cloud integration attempting to use service account integration")
  453. }
  454. return &AWSAccessKey{AccessKeyID: config.ServiceKeyName, SecretAccessKey: config.ServiceKeySecret}, nil
  455. }
  456. // GetAWSAthenaInfo generate an AWSAthenaInfo object from the config
  457. func (aws *AWS) GetAWSAthenaInfo() (*AwsAthenaInfo, error) {
  458. config, err := aws.GetConfig()
  459. if err != nil {
  460. return nil, fmt.Errorf("could not retrieve AwsAthenaInfo %s", err)
  461. }
  462. aak, err := aws.GetAWSAccessKey()
  463. if err != nil {
  464. return nil, err
  465. }
  466. return &AwsAthenaInfo{
  467. AthenaBucketName: config.AthenaBucketName,
  468. AthenaRegion: config.AthenaRegion,
  469. AthenaDatabase: config.AthenaDatabase,
  470. AthenaTable: config.AthenaTable,
  471. AthenaWorkgroup: config.AthenaWorkgroup,
  472. ServiceKeyName: aak.AccessKeyID,
  473. ServiceKeySecret: aak.SecretAccessKey,
  474. AccountID: config.AthenaProjectID,
  475. MasterPayerARN: config.MasterPayerARN,
  476. }, nil
  477. }
  478. func (aws *AWS) UpdateConfigFromConfigMap(cm map[string]string) (*CustomPricing, error) {
  479. return aws.Config.UpdateFromMap(cm)
  480. }
  481. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  482. return aws.Config.Update(func(c *CustomPricing) error {
  483. if updateType == SpotInfoUpdateType {
  484. asfi := AwsSpotFeedInfo{}
  485. err := json.NewDecoder(r).Decode(&asfi)
  486. if err != nil {
  487. return err
  488. }
  489. c.ServiceKeyName = asfi.ServiceKeyName
  490. if asfi.ServiceKeySecret != "" {
  491. c.ServiceKeySecret = asfi.ServiceKeySecret
  492. }
  493. c.SpotDataPrefix = asfi.Prefix
  494. c.SpotDataBucket = asfi.BucketName
  495. c.ProjectID = asfi.AccountID
  496. c.SpotDataRegion = asfi.Region
  497. c.SpotLabel = asfi.SpotLabel
  498. c.SpotLabelValue = asfi.SpotLabelValue
  499. } else if updateType == AthenaInfoUpdateType {
  500. aai := AwsAthenaInfo{}
  501. err := json.NewDecoder(r).Decode(&aai)
  502. if err != nil {
  503. return err
  504. }
  505. c.AthenaBucketName = aai.AthenaBucketName
  506. c.AthenaRegion = aai.AthenaRegion
  507. c.AthenaDatabase = aai.AthenaDatabase
  508. c.AthenaTable = aai.AthenaTable
  509. c.AthenaWorkgroup = aai.AthenaWorkgroup
  510. c.ServiceKeyName = aai.ServiceKeyName
  511. if aai.ServiceKeySecret != "" {
  512. c.ServiceKeySecret = aai.ServiceKeySecret
  513. }
  514. if aai.MasterPayerARN != "" {
  515. c.MasterPayerARN = aai.MasterPayerARN
  516. }
  517. c.AthenaProjectID = aai.AccountID
  518. } else {
  519. a := make(map[string]interface{})
  520. err := json.NewDecoder(r).Decode(&a)
  521. if err != nil {
  522. return err
  523. }
  524. for k, v := range a {
  525. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  526. vstr, ok := v.(string)
  527. if ok {
  528. err := SetCustomPricingField(c, kUpper, vstr)
  529. if err != nil {
  530. return err
  531. }
  532. } else {
  533. return fmt.Errorf("type error while updating config for %s", kUpper)
  534. }
  535. }
  536. }
  537. if env.IsRemoteEnabled() {
  538. err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
  539. if err != nil {
  540. return err
  541. }
  542. }
  543. return nil
  544. })
  545. }
  546. type awsKey struct {
  547. SpotLabelName string
  548. SpotLabelValue string
  549. Labels map[string]string
  550. ProviderID string
  551. }
  552. func (k *awsKey) GPUCount() int {
  553. return 0
  554. }
  555. func (k *awsKey) GPUType() string {
  556. return ""
  557. }
  558. func (k *awsKey) ID() string {
  559. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  560. if matchNum == 2 {
  561. return group
  562. }
  563. }
  564. log.Warnf("Could not find instance ID in \"%s\"", k.ProviderID)
  565. return ""
  566. }
  567. func (k *awsKey) Features() string {
  568. instanceType, _ := util.GetInstanceType(k.Labels)
  569. operatingSystem, _ := util.GetOperatingSystem(k.Labels)
  570. region, _ := util.GetRegion(k.Labels)
  571. key := region + "," + instanceType + "," + operatingSystem
  572. usageType := k.getUsageType(k.Labels)
  573. spotKey := key + "," + usageType
  574. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  575. return spotKey
  576. }
  577. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  578. return spotKey
  579. }
  580. if usageType == PreemptibleType {
  581. return spotKey
  582. }
  583. return key
  584. }
  585. func (k *awsKey) getUsageType(labels map[string]string) string {
  586. if _, ok := labels["eks.amazonaws.com/capacityType"]; ok && labels["eks.amazonaws.com/capacityType"] == "SPOT" {
  587. return PreemptibleType
  588. }
  589. return ""
  590. }
  591. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  592. pricing, ok := aws.Pricing[pvk.Features()]
  593. if !ok {
  594. log.Debugf("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  595. return &PV{}, nil
  596. }
  597. return pricing.PV, nil
  598. }
  599. type awsPVKey struct {
  600. Labels map[string]string
  601. StorageClassParameters map[string]string
  602. StorageClassName string
  603. Name string
  604. DefaultRegion string
  605. ProviderID string
  606. }
  607. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  608. providerID := ""
  609. if pv.Spec.AWSElasticBlockStore != nil {
  610. providerID = pv.Spec.AWSElasticBlockStore.VolumeID
  611. } else if pv.Spec.CSI != nil {
  612. providerID = pv.Spec.CSI.VolumeHandle
  613. }
  614. return &awsPVKey{
  615. Labels: pv.Labels,
  616. StorageClassName: pv.Spec.StorageClassName,
  617. StorageClassParameters: parameters,
  618. Name: pv.Name,
  619. DefaultRegion: defaultRegion,
  620. ProviderID: providerID,
  621. }
  622. }
  623. func (key *awsPVKey) ID() string {
  624. return key.ProviderID
  625. }
  626. func (key *awsPVKey) GetStorageClass() string {
  627. return key.StorageClassName
  628. }
  629. func (key *awsPVKey) Features() string {
  630. storageClass := key.StorageClassParameters["type"]
  631. if storageClass == "standard" {
  632. storageClass = "gp2"
  633. }
  634. // Storage class names are generally EBS volume types (gp2)
  635. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  636. // Converts between the 2
  637. region, ok := util.GetRegion(key.Labels)
  638. if !ok {
  639. region = key.DefaultRegion
  640. }
  641. class, ok := volTypes[storageClass]
  642. if !ok {
  643. log.Debugf("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  644. }
  645. return region + "," + class
  646. }
  647. // GetKey maps node labels to information needed to retrieve pricing data
  648. func (aws *AWS) GetKey(labels map[string]string, n *v1.Node) Key {
  649. return &awsKey{
  650. SpotLabelName: aws.SpotLabelName,
  651. SpotLabelValue: aws.SpotLabelValue,
  652. Labels: labels,
  653. ProviderID: labels["providerID"],
  654. }
  655. }
  656. func (aws *AWS) isPreemptible(key string) bool {
  657. s := strings.Split(key, ",")
  658. if len(s) == 4 && s[3] == PreemptibleType {
  659. return true
  660. }
  661. return false
  662. }
  663. func (aws *AWS) ClusterManagementPricing() (string, float64, error) {
  664. return aws.clusterProvisioner, aws.clusterManagementPrice, nil
  665. }
  666. // Use the pricing data from the current region. Fall back to using all region data if needed.
  667. func (aws *AWS) getRegionPricing(nodeList []*v1.Node) (*http.Response, string, error) {
  668. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/"
  669. region := ""
  670. multiregion := false
  671. for _, n := range nodeList {
  672. labels := n.GetLabels()
  673. currentNodeRegion := ""
  674. if r, ok := util.GetRegion(labels); ok {
  675. currentNodeRegion = r
  676. // Switch to Chinese endpoint for regions with the Chinese prefix
  677. if strings.HasPrefix(currentNodeRegion, "cn-") {
  678. pricingURL = "https://pricing.cn-north-1.amazonaws.com.cn/offers/v1.0/cn/AmazonEC2/current/"
  679. }
  680. } else {
  681. multiregion = true // We weren't able to detect the node's region, so pull all data.
  682. break
  683. }
  684. if region == "" { // We haven't set a region yet
  685. region = currentNodeRegion
  686. } else if region != "" && currentNodeRegion != region { // If two nodes have different regions here, we'll need to fetch all pricing data.
  687. multiregion = true
  688. break
  689. }
  690. }
  691. // Chinese multiregion endpoint only contains data for Chinese regions and Chinese regions are excluded from other endpoint
  692. if region != "" && !multiregion {
  693. pricingURL += region + "/"
  694. }
  695. pricingURL += "index.json"
  696. if env.GetAWSPricingURL() != "" { // Allow override of pricing URL
  697. pricingURL = env.GetAWSPricingURL()
  698. }
  699. log.Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  700. resp, err := http.Get(pricingURL)
  701. if err != nil {
  702. log.Errorf("Bogus fetch of \"%s\": %v", pricingURL, err)
  703. return nil, pricingURL, err
  704. }
  705. return resp, pricingURL, err
  706. }
  707. // SpotRefreshEnabled determines whether the required configs to run the spot feed query have been set up
  708. func (aws *AWS) SpotRefreshEnabled() bool {
  709. // Need a valid value for at least one of these fields to consider spot pricing as enabled
  710. return len(aws.SpotDataBucket) != 0 || len(aws.SpotDataRegion) != 0 || len(aws.ProjectID) != 0
  711. }
  712. // DownloadPricingData fetches data from the AWS Pricing API
  713. func (aws *AWS) DownloadPricingData() error {
  714. aws.DownloadPricingDataLock.Lock()
  715. defer aws.DownloadPricingDataLock.Unlock()
  716. c, err := aws.Config.GetCustomPricingData()
  717. if err != nil {
  718. log.Errorf("Error downloading default pricing data: %s", err.Error())
  719. }
  720. aws.BaseCPUPrice = c.CPU
  721. aws.BaseRAMPrice = c.RAM
  722. aws.BaseGPUPrice = c.GPU
  723. aws.BaseSpotCPUPrice = c.SpotCPU
  724. aws.BaseSpotRAMPrice = c.SpotRAM
  725. aws.BaseSpotGPUPrice = c.SpotGPU
  726. aws.SpotLabelName = c.SpotLabel
  727. aws.SpotLabelValue = c.SpotLabelValue
  728. aws.SpotDataBucket = c.SpotDataBucket
  729. aws.SpotDataPrefix = c.SpotDataPrefix
  730. aws.ProjectID = c.ProjectID
  731. aws.SpotDataRegion = c.SpotDataRegion
  732. aws.ConfigureAuthWith(c) // load aws authentication from configuration or secret
  733. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  734. log.Warnf("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  735. }
  736. nodeList := aws.Clientset.GetAllNodes()
  737. inputkeys := make(map[string]bool)
  738. for _, n := range nodeList {
  739. if _, ok := n.Labels["eks.amazonaws.com/nodegroup"]; ok {
  740. aws.clusterManagementPrice = 0.10
  741. aws.clusterProvisioner = "EKS"
  742. } else if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  743. aws.clusterProvisioner = "KOPS"
  744. }
  745. labels := n.GetObjectMeta().GetLabels()
  746. key := aws.GetKey(labels, n)
  747. inputkeys[key.Features()] = true
  748. }
  749. pvList := aws.Clientset.GetAllPersistentVolumes()
  750. storageClasses := aws.Clientset.GetAllStorageClasses()
  751. storageClassMap := make(map[string]map[string]string)
  752. for _, storageClass := range storageClasses {
  753. params := storageClass.Parameters
  754. storageClassMap[storageClass.ObjectMeta.Name] = params
  755. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  756. storageClassMap["default"] = params
  757. storageClassMap[""] = params
  758. }
  759. }
  760. pvkeys := make(map[string]PVKey)
  761. for _, pv := range pvList {
  762. params, ok := storageClassMap[pv.Spec.StorageClassName]
  763. if !ok {
  764. log.Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  765. continue
  766. }
  767. key := aws.GetPVKey(pv, params, "")
  768. pvkeys[key.Features()] = key
  769. }
  770. // RIDataRunning establishes the existence of the goroutine. Since it's possible we
  771. // run multiple downloads, we don't want to create multiple go routines if one already exists
  772. if !aws.RIDataRunning {
  773. err = aws.GetReservationDataFromAthena() // Block until one run has completed.
  774. if err != nil {
  775. log.Errorf("Failed to lookup reserved instance data: %s", err.Error())
  776. } else { // If we make one successful run, check on new reservation data every hour
  777. go func() {
  778. defer errs.HandlePanic()
  779. aws.RIDataRunning = true
  780. for {
  781. log.Infof("Reserved Instance watcher running... next update in 1h")
  782. time.Sleep(time.Hour)
  783. err := aws.GetReservationDataFromAthena()
  784. if err != nil {
  785. log.Infof("Error updating RI data: %s", err.Error())
  786. }
  787. }
  788. }()
  789. }
  790. }
  791. if !aws.SavingsPlanDataRunning {
  792. err = aws.GetSavingsPlanDataFromAthena()
  793. if err != nil {
  794. log.Errorf("Failed to lookup savings plan data: %s", err.Error())
  795. } else {
  796. go func() {
  797. defer errs.HandlePanic()
  798. aws.SavingsPlanDataRunning = true
  799. for {
  800. log.Infof("Savings Plan watcher running... next update in 1h")
  801. time.Sleep(time.Hour)
  802. err := aws.GetSavingsPlanDataFromAthena()
  803. if err != nil {
  804. log.Infof("Error updating Savings Plan data: %s", err.Error())
  805. }
  806. }
  807. }()
  808. }
  809. }
  810. aws.Pricing = make(map[string]*AWSProductTerms)
  811. aws.ValidPricingKeys = make(map[string]bool)
  812. skusToKeys := make(map[string]string)
  813. resp, pricingURL, err := aws.getRegionPricing(nodeList)
  814. if err != nil {
  815. return err
  816. }
  817. dec := json.NewDecoder(resp.Body)
  818. for {
  819. t, err := dec.Token()
  820. if err == io.EOF {
  821. log.Infof("done loading \"%s\"\n", pricingURL)
  822. break
  823. } else if err != nil {
  824. log.Errorf("error parsing response json %v", resp.Body)
  825. break
  826. }
  827. if t == "products" {
  828. _, err := dec.Token() // this should parse the opening "{""
  829. if err != nil {
  830. return err
  831. }
  832. for dec.More() {
  833. _, err := dec.Token() // the sku token
  834. if err != nil {
  835. return err
  836. }
  837. product := &AWSProduct{}
  838. err = dec.Decode(&product)
  839. if err != nil {
  840. log.Errorf("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  841. break
  842. }
  843. if product.Attributes.PreInstalledSw == "NA" &&
  844. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) &&
  845. product.Attributes.CapacityStatus == "Used" {
  846. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  847. spotKey := key + ",preemptible"
  848. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  849. productTerms := &AWSProductTerms{
  850. Sku: product.Sku,
  851. Memory: product.Attributes.Memory,
  852. Storage: product.Attributes.Storage,
  853. VCpu: product.Attributes.VCpu,
  854. GPU: product.Attributes.GPU,
  855. }
  856. aws.Pricing[key] = productTerms
  857. aws.Pricing[spotKey] = productTerms
  858. skusToKeys[product.Sku] = key
  859. }
  860. aws.ValidPricingKeys[key] = true
  861. aws.ValidPricingKeys[spotKey] = true
  862. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  863. // UsageTypes may be prefixed with a region code - we're removing this when using
  864. // volTypes to keep lookups generic
  865. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  866. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  867. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  868. spotKey := key + ",preemptible"
  869. pv := &PV{
  870. Class: volTypes[usageTypeNoRegion],
  871. Region: locationToRegion[product.Attributes.Location],
  872. }
  873. productTerms := &AWSProductTerms{
  874. Sku: product.Sku,
  875. PV: pv,
  876. }
  877. aws.Pricing[key] = productTerms
  878. aws.Pricing[spotKey] = productTerms
  879. skusToKeys[product.Sku] = key
  880. aws.ValidPricingKeys[key] = true
  881. aws.ValidPricingKeys[spotKey] = true
  882. }
  883. }
  884. }
  885. if t == "terms" {
  886. _, err := dec.Token() // this should parse the opening "{""
  887. if err != nil {
  888. return err
  889. }
  890. termType, err := dec.Token()
  891. if err != nil {
  892. return err
  893. }
  894. if termType == "OnDemand" {
  895. _, err := dec.Token()
  896. if err != nil { // again, should parse an opening "{"
  897. return err
  898. }
  899. for dec.More() {
  900. sku, err := dec.Token()
  901. if err != nil {
  902. return err
  903. }
  904. _, err = dec.Token() // another opening "{"
  905. if err != nil {
  906. return err
  907. }
  908. skuOnDemand, err := dec.Token()
  909. if err != nil {
  910. return err
  911. }
  912. offerTerm := &AWSOfferTerm{}
  913. err = dec.Decode(&offerTerm)
  914. if err != nil {
  915. log.Errorf("Error decoding AWS Offer Term: " + err.Error())
  916. }
  917. key, ok := skusToKeys[sku.(string)]
  918. spotKey := key + ",preemptible"
  919. if ok {
  920. aws.Pricing[key].OnDemand = offerTerm
  921. aws.Pricing[spotKey].OnDemand = offerTerm
  922. var cost string
  923. if sku.(string)+OnDemandRateCode == skuOnDemand {
  924. cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  925. } else if sku.(string)+OnDemandRateCodeCn == skuOnDemand {
  926. cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCodeCn+HourlyRateCodeCn].PricePerUnit.CNY
  927. }
  928. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  929. // If the specific UsageType is the per IO cost used on io1 volumes
  930. // we need to add the per IO cost to the io1 PV cost
  931. // Add the per IO cost to the PV object for the io1 volume type
  932. aws.Pricing[key].PV.CostPerIO = cost
  933. } else if strings.Contains(key, "EBS:Volume") {
  934. // If volume, we need to get hourly cost and add it to the PV object
  935. costFloat, _ := strconv.ParseFloat(cost, 64)
  936. hourlyPrice := costFloat / 730
  937. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  938. }
  939. }
  940. _, err = dec.Token()
  941. if err != nil {
  942. return err
  943. }
  944. }
  945. _, err = dec.Token()
  946. if err != nil {
  947. return err
  948. }
  949. }
  950. }
  951. }
  952. log.Infof("Finished downloading \"%s\"", pricingURL)
  953. if !aws.SpotRefreshEnabled() {
  954. return nil
  955. }
  956. // Always run spot pricing refresh when performing download
  957. aws.refreshSpotPricing(true)
  958. // Only start a single refresh goroutine
  959. if !aws.SpotRefreshRunning {
  960. aws.SpotRefreshRunning = true
  961. go func() {
  962. defer errs.HandlePanic()
  963. for {
  964. log.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
  965. time.Sleep(SpotRefreshDuration)
  966. // Reoccurring refresh checks update times
  967. aws.refreshSpotPricing(false)
  968. }
  969. }()
  970. }
  971. return nil
  972. }
  973. func (aws *AWS) refreshSpotPricing(force bool) {
  974. aws.SpotPricingLock.Lock()
  975. defer aws.SpotPricingLock.Unlock()
  976. now := time.Now().UTC()
  977. updateTime := now.Add(-SpotRefreshDuration)
  978. // Return if there was an update time set and an hour hasn't elapsed
  979. if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
  980. return
  981. }
  982. sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion)
  983. if err != nil {
  984. log.Warnf("Skipping AWS spot data download: %s", err.Error())
  985. aws.SpotPricingError = err
  986. return
  987. }
  988. aws.SpotPricingError = nil
  989. // update time last updated
  990. aws.SpotPricingUpdatedAt = &now
  991. aws.SpotPricingByInstanceID = sp
  992. }
  993. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  994. func (aws *AWS) NetworkPricing() (*Network, error) {
  995. cpricing, err := aws.Config.GetCustomPricingData()
  996. if err != nil {
  997. return nil, err
  998. }
  999. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  1000. if err != nil {
  1001. return nil, err
  1002. }
  1003. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  1004. if err != nil {
  1005. return nil, err
  1006. }
  1007. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  1008. if err != nil {
  1009. return nil, err
  1010. }
  1011. return &Network{
  1012. ZoneNetworkEgressCost: znec,
  1013. RegionNetworkEgressCost: rnec,
  1014. InternetNetworkEgressCost: inec,
  1015. }, nil
  1016. }
  1017. func (aws *AWS) LoadBalancerPricing() (*LoadBalancer, error) {
  1018. fffrc := 0.025
  1019. afrc := 0.010
  1020. lbidc := 0.008
  1021. numForwardingRules := 1.0
  1022. dataIngressGB := 0.0
  1023. var totalCost float64
  1024. if numForwardingRules < 5 {
  1025. totalCost = fffrc*numForwardingRules + lbidc*dataIngressGB
  1026. } else {
  1027. totalCost = fffrc*5 + afrc*(numForwardingRules-5) + lbidc*dataIngressGB
  1028. }
  1029. return &LoadBalancer{
  1030. Cost: totalCost,
  1031. }, nil
  1032. }
  1033. // AllNodePricing returns all the billing data fetched.
  1034. func (aws *AWS) AllNodePricing() (interface{}, error) {
  1035. aws.DownloadPricingDataLock.RLock()
  1036. defer aws.DownloadPricingDataLock.RUnlock()
  1037. return aws.Pricing, nil
  1038. }
  1039. func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
  1040. aws.SpotPricingLock.RLock()
  1041. defer aws.SpotPricingLock.RUnlock()
  1042. info, ok := aws.SpotPricingByInstanceID[instanceID]
  1043. return info, ok
  1044. }
  1045. func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
  1046. aws.RIDataLock.RLock()
  1047. defer aws.RIDataLock.RUnlock()
  1048. data, ok := aws.RIPricingByInstanceID[instanceID]
  1049. return data, ok
  1050. }
  1051. func (aws *AWS) savingsPlanPricing(instanceID string) (*SavingsPlanData, bool) {
  1052. aws.SavingsPlanDataLock.RLock()
  1053. defer aws.SavingsPlanDataLock.RUnlock()
  1054. data, ok := aws.SavingsPlanDataByInstanceID[instanceID]
  1055. return data, ok
  1056. }
  1057. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  1058. key := k.Features()
  1059. if spotInfo, ok := aws.spotPricing(k.ID()); ok {
  1060. var spotcost string
  1061. log.DedupedInfof(5, "Looking up spot data from feed for node %s", k.ID())
  1062. arr := strings.Split(spotInfo.Charge, " ")
  1063. if len(arr) == 2 {
  1064. spotcost = arr[0]
  1065. } else {
  1066. log.Infof("Spot data for node %s is missing", k.ID())
  1067. }
  1068. return &Node{
  1069. Cost: spotcost,
  1070. VCPU: terms.VCpu,
  1071. RAM: terms.Memory,
  1072. GPU: terms.GPU,
  1073. Storage: terms.Storage,
  1074. BaseCPUPrice: aws.BaseCPUPrice,
  1075. BaseRAMPrice: aws.BaseRAMPrice,
  1076. BaseGPUPrice: aws.BaseGPUPrice,
  1077. UsageType: PreemptibleType,
  1078. }, nil
  1079. } else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
  1080. log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
  1081. return &Node{
  1082. VCPU: terms.VCpu,
  1083. VCPUCost: aws.BaseSpotCPUPrice,
  1084. RAM: terms.Memory,
  1085. GPU: terms.GPU,
  1086. Storage: terms.Storage,
  1087. BaseCPUPrice: aws.BaseCPUPrice,
  1088. BaseRAMPrice: aws.BaseRAMPrice,
  1089. BaseGPUPrice: aws.BaseGPUPrice,
  1090. UsageType: PreemptibleType,
  1091. }, nil
  1092. } else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
  1093. strCost := fmt.Sprintf("%f", sp.EffectiveCost)
  1094. return &Node{
  1095. Cost: strCost,
  1096. VCPU: terms.VCpu,
  1097. RAM: terms.Memory,
  1098. GPU: terms.GPU,
  1099. Storage: terms.Storage,
  1100. BaseCPUPrice: aws.BaseCPUPrice,
  1101. BaseRAMPrice: aws.BaseRAMPrice,
  1102. BaseGPUPrice: aws.BaseGPUPrice,
  1103. UsageType: usageType,
  1104. }, nil
  1105. } else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
  1106. strCost := fmt.Sprintf("%f", ri.EffectiveCost)
  1107. return &Node{
  1108. Cost: strCost,
  1109. VCPU: terms.VCpu,
  1110. RAM: terms.Memory,
  1111. GPU: terms.GPU,
  1112. Storage: terms.Storage,
  1113. BaseCPUPrice: aws.BaseCPUPrice,
  1114. BaseRAMPrice: aws.BaseRAMPrice,
  1115. BaseGPUPrice: aws.BaseGPUPrice,
  1116. UsageType: usageType,
  1117. }, nil
  1118. }
  1119. var cost string
  1120. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  1121. if ok {
  1122. cost = c.PricePerUnit.USD
  1123. } else {
  1124. // Check for Chinese pricing before throwing error
  1125. c, ok = terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCodeCn+HourlyRateCodeCn]
  1126. if ok {
  1127. cost = c.PricePerUnit.CNY
  1128. } else {
  1129. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  1130. }
  1131. }
  1132. return &Node{
  1133. Cost: cost,
  1134. VCPU: terms.VCpu,
  1135. RAM: terms.Memory,
  1136. GPU: terms.GPU,
  1137. Storage: terms.Storage,
  1138. BaseCPUPrice: aws.BaseCPUPrice,
  1139. BaseRAMPrice: aws.BaseRAMPrice,
  1140. BaseGPUPrice: aws.BaseGPUPrice,
  1141. UsageType: usageType,
  1142. }, nil
  1143. }
  1144. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  1145. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  1146. aws.DownloadPricingDataLock.RLock()
  1147. defer aws.DownloadPricingDataLock.RUnlock()
  1148. key := k.Features()
  1149. usageType := "ondemand"
  1150. if aws.isPreemptible(key) {
  1151. usageType = PreemptibleType
  1152. }
  1153. terms, ok := aws.Pricing[key]
  1154. if ok {
  1155. return aws.createNode(terms, usageType, k)
  1156. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  1157. aws.DownloadPricingDataLock.RUnlock()
  1158. err := aws.DownloadPricingData()
  1159. aws.DownloadPricingDataLock.RLock()
  1160. if err != nil {
  1161. return &Node{
  1162. Cost: aws.BaseCPUPrice,
  1163. BaseCPUPrice: aws.BaseCPUPrice,
  1164. BaseRAMPrice: aws.BaseRAMPrice,
  1165. BaseGPUPrice: aws.BaseGPUPrice,
  1166. UsageType: usageType,
  1167. UsesBaseCPUPrice: true,
  1168. }, err
  1169. }
  1170. terms, termsOk := aws.Pricing[key]
  1171. if !termsOk {
  1172. return &Node{
  1173. Cost: aws.BaseCPUPrice,
  1174. BaseCPUPrice: aws.BaseCPUPrice,
  1175. BaseRAMPrice: aws.BaseRAMPrice,
  1176. BaseGPUPrice: aws.BaseGPUPrice,
  1177. UsageType: usageType,
  1178. UsesBaseCPUPrice: true,
  1179. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  1180. }
  1181. return aws.createNode(terms, usageType, k)
  1182. } else { // Fall back to base pricing if we can't find the key. Base pricing is handled at the costmodel level.
  1183. return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
  1184. }
  1185. }
  1186. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  1187. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  1188. defaultClusterName := "AWS Cluster #1"
  1189. c, err := awsProvider.GetConfig()
  1190. if err != nil {
  1191. return nil, err
  1192. }
  1193. remoteEnabled := env.IsRemoteEnabled()
  1194. makeStructure := func(clusterName string) (map[string]string, error) {
  1195. m := make(map[string]string)
  1196. m["name"] = clusterName
  1197. m["provider"] = kubecost.AWSProvider
  1198. m["account"] = c.AthenaProjectID // this value requires configuration but is unavailable else where
  1199. m["region"] = awsProvider.clusterRegion
  1200. m["id"] = env.GetClusterID()
  1201. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  1202. m["provisioner"] = awsProvider.clusterProvisioner
  1203. return m, nil
  1204. }
  1205. if c.ClusterName != "" {
  1206. return makeStructure(c.ClusterName)
  1207. }
  1208. maybeClusterId := env.GetAWSClusterID()
  1209. if len(maybeClusterId) != 0 {
  1210. log.Infof("Returning \"%s\" as ClusterName", maybeClusterId)
  1211. return makeStructure(maybeClusterId)
  1212. }
  1213. log.Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", env.AWSClusterIDEnvVar)
  1214. return makeStructure(defaultClusterName)
  1215. }
  1216. // updates the authentication to the latest values (via config or secret)
  1217. func (aws *AWS) ConfigureAuth() error {
  1218. c, err := aws.Config.GetCustomPricingData()
  1219. if err != nil {
  1220. log.Errorf("Error downloading default pricing data: %s", err.Error())
  1221. }
  1222. return aws.ConfigureAuthWith(c)
  1223. }
  1224. // updates the authentication to the latest values (via config or secret)
  1225. func (aws *AWS) ConfigureAuthWith(config *CustomPricing) error {
  1226. accessKeyID, accessKeySecret := aws.getAWSAuth(false, config)
  1227. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1228. err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
  1229. if err != nil {
  1230. return err
  1231. }
  1232. err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
  1233. if err != nil {
  1234. return err
  1235. }
  1236. }
  1237. return nil
  1238. }
  1239. // Gets the aws key id and secret
  1240. func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
  1241. // 1. Check config values first (set from frontend UI)
  1242. if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
  1243. aws.serviceAccountChecks.set("hasKey", &ServiceAccountCheck{
  1244. Message: "AWS ServiceKey exists",
  1245. Status: true,
  1246. })
  1247. return cp.ServiceKeyName, cp.ServiceKeySecret
  1248. }
  1249. // 2. Check for secret
  1250. s, _ := aws.loadAWSAuthSecret(forceReload)
  1251. if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
  1252. aws.serviceAccountChecks.set("hasKey", &ServiceAccountCheck{
  1253. Message: "AWS ServiceKey exists",
  1254. Status: true,
  1255. })
  1256. return s.AccessKeyID, s.SecretAccessKey
  1257. }
  1258. // 3. Fall back to env vars
  1259. if env.GetAWSAccessKeyID() == "" || env.GetAWSAccessKeyID() == "" {
  1260. aws.serviceAccountChecks.set("hasKey", &ServiceAccountCheck{
  1261. Message: "AWS ServiceKey exists",
  1262. Status: false,
  1263. })
  1264. } else {
  1265. aws.serviceAccountChecks.set("hasKey", &ServiceAccountCheck{
  1266. Message: "AWS ServiceKey exists",
  1267. Status: true,
  1268. })
  1269. }
  1270. return env.GetAWSAccessKeyID(), env.GetAWSAccessKeySecret()
  1271. }
  1272. // Load once and cache the result (even on failure). This is an install time secret, so
  1273. // we don't expect the secret to change. If it does, however, we can force reload using
  1274. // the input parameter.
  1275. func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
  1276. if !force && loadedAWSSecret {
  1277. return awsSecret, nil
  1278. }
  1279. loadedAWSSecret = true
  1280. exists, err := fileutil.FileExists(authSecretPath)
  1281. if !exists || err != nil {
  1282. return nil, fmt.Errorf("Failed to locate service account file: %s", authSecretPath)
  1283. }
  1284. result, err := os.ReadFile(authSecretPath)
  1285. if err != nil {
  1286. return nil, err
  1287. }
  1288. var ak AWSAccessKey
  1289. err = json.Unmarshal(result, &ak)
  1290. if err != nil {
  1291. return nil, err
  1292. }
  1293. awsSecret = &ak
  1294. return awsSecret, nil
  1295. }
  1296. func (aws *AWS) getAddressesForRegion(ctx context.Context, region string) (*ec2.DescribeAddressesOutput, error) {
  1297. aak, err := aws.GetAWSAccessKey()
  1298. if err != nil {
  1299. return nil, err
  1300. }
  1301. cfg, err := aak.CreateConfig(region)
  1302. if err != nil {
  1303. return nil, err
  1304. }
  1305. cli := ec2.NewFromConfig(cfg)
  1306. return cli.DescribeAddresses(ctx, &ec2.DescribeAddressesInput{})
  1307. }
  1308. // GetAddresses retrieves EC2 addresses
  1309. func (aws *AWS) GetAddresses() ([]byte, error) {
  1310. aws.ConfigureAuth() // load authentication data into env vars
  1311. addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
  1312. errorCh := make(chan error, len(awsRegions))
  1313. var wg sync.WaitGroup
  1314. wg.Add(len(awsRegions))
  1315. // Get volumes from each AWS region
  1316. for _, r := range awsRegions {
  1317. // Fetch IP address response and send results and errors to their
  1318. // respective channels
  1319. go func(region string) {
  1320. defer wg.Done()
  1321. defer errs.HandlePanic()
  1322. // Query for first page of volume results
  1323. resp, err := aws.getAddressesForRegion(context.TODO(), region)
  1324. if err != nil {
  1325. errorCh <- err
  1326. return
  1327. }
  1328. addressCh <- resp
  1329. }(r)
  1330. }
  1331. // Close the result channels after everything has been sent
  1332. go func() {
  1333. defer errs.HandlePanic()
  1334. wg.Wait()
  1335. close(errorCh)
  1336. close(addressCh)
  1337. }()
  1338. var addresses []*ec2Types.Address
  1339. for adds := range addressCh {
  1340. for _, add := range adds.Addresses {
  1341. a := add // duplicate to avoid pointer to iterator
  1342. addresses = append(addresses, &a)
  1343. }
  1344. }
  1345. var errs []error
  1346. for err := range errorCh {
  1347. log.DedupedWarningf(5, "unable to get addresses: %s", err)
  1348. errs = append(errs, err)
  1349. }
  1350. // Return error if no addresses are returned
  1351. if len(errs) > 0 && len(addresses) == 0 {
  1352. return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errs), errs)
  1353. }
  1354. // Format the response this way to match the JSON-encoded formatting of a single response
  1355. // from DescribeAddresss, so that consumers can always expect AWS disk responses to have
  1356. // a "Addresss" key at the top level.
  1357. return json.Marshal(map[string][]*ec2Types.Address{
  1358. "Addresses": addresses,
  1359. })
  1360. }
  1361. func (aws *AWS) getDisksForRegion(ctx context.Context, region string, maxResults int32, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
  1362. aak, err := aws.GetAWSAccessKey()
  1363. if err != nil {
  1364. return nil, err
  1365. }
  1366. cfg, err := aak.CreateConfig(region)
  1367. if err != nil {
  1368. return nil, err
  1369. }
  1370. cli := ec2.NewFromConfig(cfg)
  1371. return cli.DescribeVolumes(ctx, &ec2.DescribeVolumesInput{
  1372. MaxResults: &maxResults,
  1373. NextToken: nextToken,
  1374. })
  1375. }
  1376. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  1377. func (aws *AWS) GetDisks() ([]byte, error) {
  1378. aws.ConfigureAuth() // load authentication data into env vars
  1379. volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
  1380. errorCh := make(chan error, len(awsRegions))
  1381. var wg sync.WaitGroup
  1382. wg.Add(len(awsRegions))
  1383. // Get volumes from each AWS region
  1384. for _, r := range awsRegions {
  1385. // Fetch volume response and send results and errors to their
  1386. // respective channels
  1387. go func(region string) {
  1388. defer wg.Done()
  1389. defer errs.HandlePanic()
  1390. // Query for first page of volume results
  1391. resp, err := aws.getDisksForRegion(context.TODO(), region, 1000, nil)
  1392. if err != nil {
  1393. errorCh <- err
  1394. return
  1395. }
  1396. volumeCh <- resp
  1397. // A NextToken indicates more pages of results. Keep querying
  1398. // until all pages are retrieved.
  1399. for resp.NextToken != nil {
  1400. resp, err = aws.getDisksForRegion(context.TODO(), region, 100, resp.NextToken)
  1401. if err != nil {
  1402. errorCh <- err
  1403. return
  1404. }
  1405. volumeCh <- resp
  1406. }
  1407. }(r)
  1408. }
  1409. // Close the result channels after everything has been sent
  1410. go func() {
  1411. defer errs.HandlePanic()
  1412. wg.Wait()
  1413. close(errorCh)
  1414. close(volumeCh)
  1415. }()
  1416. var volumes []*ec2Types.Volume
  1417. for vols := range volumeCh {
  1418. for _, vol := range vols.Volumes {
  1419. v := vol // duplicate to avoid pointer to iterator
  1420. volumes = append(volumes, &v)
  1421. }
  1422. }
  1423. var errs []error
  1424. for err := range errorCh {
  1425. log.DedupedWarningf(5, "unable to get disks: %s", err)
  1426. errs = append(errs, err)
  1427. }
  1428. // Return error if no volumes are returned
  1429. if len(errs) > 0 && len(volumes) == 0 {
  1430. return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errs), errs)
  1431. }
  1432. // Format the response this way to match the JSON-encoded formatting of a single response
  1433. // from DescribeVolumes, so that consumers can always expect AWS disk responses to have
  1434. // a "Volumes" key at the top level.
  1435. return json.Marshal(map[string][]*ec2Types.Volume{
  1436. "Volumes": volumes,
  1437. })
  1438. }
  1439. func (*AWS) GetOrphanedResources() ([]OrphanedResource, error) {
  1440. return nil, errors.New("not implemented")
  1441. }
  1442. // QueryAthenaPaginated executes athena query and processes results.
  1443. func (aws *AWS) QueryAthenaPaginated(ctx context.Context, query string, fn func(*athena.GetQueryResultsOutput) bool) error {
  1444. awsAthenaInfo, err := aws.GetAWSAthenaInfo()
  1445. if err != nil {
  1446. return err
  1447. }
  1448. if awsAthenaInfo.AthenaDatabase == "" || awsAthenaInfo.AthenaTable == "" || awsAthenaInfo.AthenaRegion == "" ||
  1449. awsAthenaInfo.AthenaBucketName == "" || awsAthenaInfo.AccountID == "" {
  1450. return fmt.Errorf("QueryAthenaPaginated: athena configuration incomplete")
  1451. }
  1452. queryExecutionCtx := &athenaTypes.QueryExecutionContext{
  1453. Database: awsSDK.String(awsAthenaInfo.AthenaDatabase),
  1454. }
  1455. resultConfiguration := &athenaTypes.ResultConfiguration{
  1456. OutputLocation: awsSDK.String(awsAthenaInfo.AthenaBucketName),
  1457. }
  1458. startQueryExecutionInput := &athena.StartQueryExecutionInput{
  1459. QueryString: awsSDK.String(query),
  1460. QueryExecutionContext: queryExecutionCtx,
  1461. ResultConfiguration: resultConfiguration,
  1462. }
  1463. // Only set if there is a value, the default input is nil which defaults to the 'primary' workgroup
  1464. if awsAthenaInfo.AthenaWorkgroup != "" {
  1465. startQueryExecutionInput.WorkGroup = awsSDK.String(awsAthenaInfo.AthenaWorkgroup)
  1466. }
  1467. // Create Athena Client
  1468. cfg, err := awsAthenaInfo.CreateConfig()
  1469. if err != nil {
  1470. log.Errorf("Could not retrieve Athena Configuration: %s", err.Error())
  1471. }
  1472. cli := athena.NewFromConfig(cfg)
  1473. // Query Athena
  1474. startQueryExecutionOutput, err := cli.StartQueryExecution(ctx, startQueryExecutionInput)
  1475. if err != nil {
  1476. return fmt.Errorf("QueryAthenaPaginated: start query error: %s", err.Error())
  1477. }
  1478. err = waitForQueryToComplete(ctx, cli, startQueryExecutionOutput.QueryExecutionId)
  1479. if err != nil {
  1480. return fmt.Errorf("QueryAthenaPaginated: query execution error: %s", err.Error())
  1481. }
  1482. queryResultsInput := &athena.GetQueryResultsInput{
  1483. QueryExecutionId: startQueryExecutionOutput.QueryExecutionId,
  1484. }
  1485. getQueryResultsPaginator := athena.NewGetQueryResultsPaginator(cli, queryResultsInput)
  1486. for getQueryResultsPaginator.HasMorePages() {
  1487. pg, err := getQueryResultsPaginator.NextPage(ctx)
  1488. if err != nil {
  1489. log.Errorf("QueryAthenaPaginated: NextPage error: %s", err.Error())
  1490. continue
  1491. }
  1492. fn(pg)
  1493. }
  1494. return nil
  1495. }
  1496. func waitForQueryToComplete(ctx context.Context, client *athena.Client, queryExecutionID *string) error {
  1497. inp := &athena.GetQueryExecutionInput{
  1498. QueryExecutionId: queryExecutionID,
  1499. }
  1500. isQueryStillRunning := true
  1501. for isQueryStillRunning {
  1502. qe, err := client.GetQueryExecution(ctx, inp)
  1503. if err != nil {
  1504. return err
  1505. }
  1506. if qe.QueryExecution.Status.State == "SUCCEEDED" {
  1507. isQueryStillRunning = false
  1508. continue
  1509. }
  1510. if qe.QueryExecution.Status.State != "RUNNING" && qe.QueryExecution.Status.State != "QUEUED" {
  1511. return fmt.Errorf("no query results available for query %s", *queryExecutionID)
  1512. }
  1513. time.Sleep(2 * time.Second)
  1514. }
  1515. return nil
  1516. }
  1517. type SavingsPlanData struct {
  1518. ResourceID string
  1519. EffectiveCost float64
  1520. SavingsPlanARN string
  1521. MostRecentDate string
  1522. }
  1523. func (aws *AWS) GetSavingsPlanDataFromAthena() error {
  1524. cfg, err := aws.GetConfig()
  1525. if err != nil {
  1526. aws.RIPricingError = err
  1527. return err
  1528. }
  1529. if cfg.AthenaBucketName == "" {
  1530. err = fmt.Errorf("No Athena Bucket configured")
  1531. aws.RIPricingError = err
  1532. return err
  1533. }
  1534. if aws.SavingsPlanDataByInstanceID == nil {
  1535. aws.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData)
  1536. }
  1537. tNow := time.Now()
  1538. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1539. start := tOneDayAgo.Format("2006-01-02")
  1540. end := tNow.Format("2006-01-02")
  1541. // Use Savings Plan Effective Rate as an estimation for cost, assuming the 1h most recent period got a fully loaded savings plan.
  1542. //
  1543. q := `SELECT
  1544. line_item_usage_start_date,
  1545. savings_plan_savings_plan_a_r_n,
  1546. line_item_resource_id,
  1547. savings_plan_savings_plan_rate
  1548. FROM %s as cost_data
  1549. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1550. AND line_item_line_item_type = 'SavingsPlanCoveredUsage' ORDER BY
  1551. line_item_usage_start_date DESC`
  1552. page := 0
  1553. processResults := func(op *athena.GetQueryResultsOutput) bool {
  1554. if op == nil {
  1555. log.Errorf("GetSavingsPlanDataFromAthena: Athena page is nil")
  1556. return false
  1557. } else if op.ResultSet == nil {
  1558. log.Errorf("GetSavingsPlanDataFromAthena: Athena page.ResultSet is nil")
  1559. return false
  1560. }
  1561. aws.SavingsPlanDataLock.Lock()
  1562. aws.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData) // Clean out the old data and only report a savingsplan price if its in the most recent run.
  1563. mostRecentDate := ""
  1564. iter := op.ResultSet.Rows
  1565. if page == 0 && len(iter) > 0 {
  1566. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1567. }
  1568. page++
  1569. for _, r := range iter {
  1570. d := *r.Data[0].VarCharValue
  1571. if mostRecentDate == "" {
  1572. mostRecentDate = d
  1573. } else if mostRecentDate != d { // Get all most recent assignments
  1574. break
  1575. }
  1576. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1577. if err != nil {
  1578. log.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1579. }
  1580. r := &SavingsPlanData{
  1581. ResourceID: *r.Data[2].VarCharValue,
  1582. EffectiveCost: cost,
  1583. SavingsPlanARN: *r.Data[1].VarCharValue,
  1584. MostRecentDate: d,
  1585. }
  1586. aws.SavingsPlanDataByInstanceID[r.ResourceID] = r
  1587. }
  1588. log.Debugf("Found %d savings plan applied instances", len(aws.SavingsPlanDataByInstanceID))
  1589. for k, r := range aws.SavingsPlanDataByInstanceID {
  1590. log.DedupedInfof(5, "Savings Plan Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1591. }
  1592. aws.SavingsPlanDataLock.Unlock()
  1593. return true
  1594. }
  1595. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1596. log.Debugf("Running Query: %s", query)
  1597. err = aws.QueryAthenaPaginated(context.TODO(), query, processResults)
  1598. if err != nil {
  1599. aws.RIPricingError = err
  1600. return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
  1601. }
  1602. return nil
  1603. }
  1604. type RIData struct {
  1605. ResourceID string
  1606. EffectiveCost float64
  1607. ReservationARN string
  1608. MostRecentDate string
  1609. }
  1610. func (aws *AWS) GetReservationDataFromAthena() error {
  1611. cfg, err := aws.GetConfig()
  1612. if err != nil {
  1613. aws.RIPricingError = err
  1614. return err
  1615. }
  1616. if cfg.AthenaBucketName == "" {
  1617. err = fmt.Errorf("No Athena Bucket configured")
  1618. aws.RIPricingError = err
  1619. return err
  1620. }
  1621. // Query for all column names in advance in order to validate configured
  1622. // label columns
  1623. columns, _ := aws.fetchColumns()
  1624. if !columns["reservation_reservation_a_r_n"] || !columns["reservation_effective_cost"] {
  1625. err = fmt.Errorf("no reservation data available in Athena")
  1626. aws.RIPricingError = err
  1627. return err
  1628. }
  1629. if aws.RIPricingByInstanceID == nil {
  1630. aws.RIPricingByInstanceID = make(map[string]*RIData)
  1631. }
  1632. tNow := time.Now()
  1633. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1634. start := tOneDayAgo.Format("2006-01-02")
  1635. end := tNow.Format("2006-01-02")
  1636. q := `SELECT
  1637. line_item_usage_start_date,
  1638. reservation_reservation_a_r_n,
  1639. line_item_resource_id,
  1640. reservation_effective_cost
  1641. FROM %s as cost_data
  1642. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1643. AND reservation_reservation_a_r_n <> '' ORDER BY
  1644. line_item_usage_start_date DESC`
  1645. page := 0
  1646. processResults := func(op *athena.GetQueryResultsOutput) bool {
  1647. if op == nil {
  1648. log.Errorf("GetReservationDataFromAthena: Athena page is nil")
  1649. return false
  1650. } else if op.ResultSet == nil {
  1651. log.Errorf("GetReservationDataFromAthena: Athena page.ResultSet is nil")
  1652. return false
  1653. }
  1654. aws.RIDataLock.Lock()
  1655. aws.RIPricingByInstanceID = make(map[string]*RIData) // Clean out the old data and only report a RI price if its in the most recent run.
  1656. mostRecentDate := ""
  1657. iter := op.ResultSet.Rows
  1658. if page == 0 && len(iter) > 0 {
  1659. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1660. }
  1661. page++
  1662. for _, r := range iter {
  1663. d := *r.Data[0].VarCharValue
  1664. if mostRecentDate == "" {
  1665. mostRecentDate = d
  1666. } else if mostRecentDate != d { // Get all most recent assignments
  1667. break
  1668. }
  1669. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1670. if err != nil {
  1671. log.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1672. }
  1673. r := &RIData{
  1674. ResourceID: *r.Data[2].VarCharValue,
  1675. EffectiveCost: cost,
  1676. ReservationARN: *r.Data[1].VarCharValue,
  1677. MostRecentDate: d,
  1678. }
  1679. aws.RIPricingByInstanceID[r.ResourceID] = r
  1680. }
  1681. log.Debugf("Found %d reserved instances", len(aws.RIPricingByInstanceID))
  1682. for k, r := range aws.RIPricingByInstanceID {
  1683. log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1684. }
  1685. aws.RIDataLock.Unlock()
  1686. return true
  1687. }
  1688. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1689. log.Debugf("Running Query: %s", query)
  1690. err = aws.QueryAthenaPaginated(context.TODO(), query, processResults)
  1691. if err != nil {
  1692. aws.RIPricingError = err
  1693. return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
  1694. }
  1695. aws.RIPricingError = nil
  1696. return nil
  1697. }
  1698. // fetchColumns returns a list of the names of all columns in the configured
  1699. // Athena tables
  1700. func (aws *AWS) fetchColumns() (map[string]bool, error) {
  1701. columnSet := map[string]bool{}
  1702. awsAthenaInfo, err := aws.GetAWSAthenaInfo()
  1703. if err != nil {
  1704. return nil, err
  1705. }
  1706. // This Query is supported by Athena tables and views
  1707. q := `SELECT column_name FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s'`
  1708. query := fmt.Sprintf(q, awsAthenaInfo.AthenaDatabase, awsAthenaInfo.AthenaTable)
  1709. pageNum := 0
  1710. athenaErr := aws.QueryAthenaPaginated(context.TODO(), query, func(page *athena.GetQueryResultsOutput) bool {
  1711. if page == nil {
  1712. log.Errorf("fetchColumns: Athena page is nil")
  1713. return false
  1714. } else if page.ResultSet == nil {
  1715. log.Errorf("fetchColumns: Athena page.ResultSet is nil")
  1716. return false
  1717. }
  1718. // remove header row 'column_name'
  1719. rows := page.ResultSet.Rows[1:]
  1720. for _, row := range rows {
  1721. columnSet[*row.Data[0].VarCharValue] = true
  1722. }
  1723. pageNum++
  1724. return true
  1725. })
  1726. if athenaErr != nil {
  1727. return columnSet, athenaErr
  1728. }
  1729. if len(columnSet) == 0 {
  1730. log.Infof("No columns retrieved from Athena")
  1731. }
  1732. return columnSet, nil
  1733. }
  1734. type spotInfo struct {
  1735. Timestamp string `csv:"Timestamp"`
  1736. UsageType string `csv:"UsageType"`
  1737. Operation string `csv:"Operation"`
  1738. InstanceID string `csv:"InstanceID"`
  1739. MyBidID string `csv:"MyBidID"`
  1740. MyMaxPrice string `csv:"MyMaxPrice"`
  1741. MarketPrice string `csv:"MarketPrice"`
  1742. Charge string `csv:"Charge"`
  1743. Version string `csv:"Version"`
  1744. }
  1745. func (aws *AWS) parseSpotData(bucket string, prefix string, projectID string, region string) (map[string]*spotInfo, error) {
  1746. aws.ConfigureAuth() // configure aws api authentication by setting env vars
  1747. s3Prefix := projectID
  1748. if len(prefix) != 0 {
  1749. s3Prefix = prefix + "/" + s3Prefix
  1750. }
  1751. aak, err := aws.GetAWSAccessKey()
  1752. if err != nil {
  1753. return nil, err
  1754. }
  1755. cfg, err := aak.CreateConfig(region)
  1756. if err != nil {
  1757. return nil, err
  1758. }
  1759. cli := s3.NewFromConfig(cfg)
  1760. downloader := manager.NewDownloader(cli)
  1761. tNow := time.Now()
  1762. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1763. ls := &s3.ListObjectsInput{
  1764. Bucket: awsSDK.String(bucket),
  1765. Prefix: awsSDK.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1766. }
  1767. ls2 := &s3.ListObjectsInput{
  1768. Bucket: awsSDK.String(bucket),
  1769. Prefix: awsSDK.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1770. }
  1771. lso, err := cli.ListObjects(context.TODO(), ls)
  1772. if err != nil {
  1773. aws.serviceAccountChecks.set("bucketList", &ServiceAccountCheck{
  1774. Message: "Bucket List Permissions Available",
  1775. Status: false,
  1776. AdditionalInfo: err.Error(),
  1777. })
  1778. return nil, err
  1779. } else {
  1780. aws.serviceAccountChecks.set("bucketList", &ServiceAccountCheck{
  1781. Message: "Bucket List Permissions Available",
  1782. Status: true,
  1783. })
  1784. }
  1785. lsoLen := len(lso.Contents)
  1786. log.Debugf("Found %d spot data files from yesterday", lsoLen)
  1787. if lsoLen == 0 {
  1788. log.Debugf("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1789. }
  1790. lso2, err := cli.ListObjects(context.TODO(), ls2)
  1791. if err != nil {
  1792. return nil, err
  1793. }
  1794. lso2Len := len(lso2.Contents)
  1795. log.Debugf("Found %d spot data files from today", lso2Len)
  1796. if lso2Len == 0 {
  1797. log.Debugf("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1798. }
  1799. // TODO: Worth it to use LastModifiedDate to determine if we should reparse the spot data?
  1800. var keys []*string
  1801. for _, obj := range lso.Contents {
  1802. keys = append(keys, obj.Key)
  1803. }
  1804. for _, obj := range lso2.Contents {
  1805. keys = append(keys, obj.Key)
  1806. }
  1807. header, err := csvutil.Header(spotInfo{}, "csv")
  1808. if err != nil {
  1809. return nil, err
  1810. }
  1811. fieldsPerRecord := len(header)
  1812. spots := make(map[string]*spotInfo)
  1813. for _, key := range keys {
  1814. getObj := &s3.GetObjectInput{
  1815. Bucket: awsSDK.String(bucket),
  1816. Key: key,
  1817. }
  1818. buf := manager.NewWriteAtBuffer([]byte{})
  1819. _, err := downloader.Download(context.TODO(), buf, getObj)
  1820. if err != nil {
  1821. aws.serviceAccountChecks.set("objectList", &ServiceAccountCheck{
  1822. Message: "Object Get Permissions Available",
  1823. Status: false,
  1824. AdditionalInfo: err.Error(),
  1825. })
  1826. return nil, err
  1827. } else {
  1828. aws.serviceAccountChecks.set("objectList", &ServiceAccountCheck{
  1829. Message: "Object Get Permissions Available",
  1830. Status: true,
  1831. })
  1832. }
  1833. r := bytes.NewReader(buf.Bytes())
  1834. gr, err := gzip.NewReader(r)
  1835. if err != nil {
  1836. return nil, err
  1837. }
  1838. csvReader := csv.NewReader(gr)
  1839. csvReader.Comma = '\t'
  1840. csvReader.FieldsPerRecord = fieldsPerRecord
  1841. dec, err := csvutil.NewDecoder(csvReader, header...)
  1842. if err != nil {
  1843. return nil, err
  1844. }
  1845. var foundVersion string
  1846. for {
  1847. spot := spotInfo{}
  1848. err := dec.Decode(&spot)
  1849. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1850. if err == io.EOF {
  1851. break
  1852. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1853. rec := dec.Record()
  1854. // the first two "Record()" will be the comment lines
  1855. // and they show up as len() == 1
  1856. // the first of which is "#Version"
  1857. // the second of which is "#Fields: "
  1858. if len(rec) != 1 {
  1859. log.Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1860. continue
  1861. }
  1862. if len(foundVersion) == 0 {
  1863. spotFeedVersion := rec[0]
  1864. log.Debugf("Spot feed version is \"%s\"", spotFeedVersion)
  1865. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1866. if matches != nil {
  1867. foundVersion = matches[1]
  1868. if foundVersion != supportedSpotFeedVersion {
  1869. log.Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1870. break
  1871. }
  1872. }
  1873. continue
  1874. } else if strings.Index(rec[0], "#") == 0 {
  1875. continue
  1876. } else {
  1877. log.Infof("skipping non-TSV line: %s", rec)
  1878. continue
  1879. }
  1880. } else if err != nil {
  1881. log.Warnf("Error during spot info decode: %+v", err)
  1882. continue
  1883. }
  1884. log.DedupedInfof(5, "Found spot info for: %s", spot.InstanceID)
  1885. spots[spot.InstanceID] = &spot
  1886. }
  1887. gr.Close()
  1888. }
  1889. return spots, nil
  1890. }
  1891. // ApplyReservedInstancePricing TODO
  1892. func (aws *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  1893. }
  1894. func (aws *AWS) ServiceAccountStatus() *ServiceAccountStatus {
  1895. return aws.serviceAccountChecks.getStatus()
  1896. }
  1897. func (aws *AWS) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  1898. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  1899. }
  1900. // Regions returns a predefined list of AWS regions
  1901. func (aws *AWS) Regions() []string {
  1902. return awsRegions
  1903. }