awsprovider.go 63 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "encoding/csv"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "net/http"
  11. "os"
  12. "regexp"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "github.com/opencost/opencost/pkg/kubecost"
  18. "github.com/opencost/opencost/pkg/clustercache"
  19. "github.com/opencost/opencost/pkg/env"
  20. errs "github.com/opencost/opencost/pkg/errors"
  21. "github.com/opencost/opencost/pkg/log"
  22. "github.com/opencost/opencost/pkg/util"
  23. "github.com/opencost/opencost/pkg/util/fileutil"
  24. "github.com/opencost/opencost/pkg/util/json"
  25. awsSDK "github.com/aws/aws-sdk-go-v2/aws"
  26. "github.com/aws/aws-sdk-go-v2/config"
  27. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  28. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  29. "github.com/aws/aws-sdk-go-v2/service/athena"
  30. athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types"
  31. "github.com/aws/aws-sdk-go-v2/service/ec2"
  32. ec2Types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
  33. "github.com/aws/aws-sdk-go-v2/service/s3"
  34. "github.com/aws/aws-sdk-go-v2/service/sts"
  35. "github.com/jszwec/csvutil"
  36. v1 "k8s.io/api/core/v1"
  37. )
  38. const (
  39. supportedSpotFeedVersion = "1"
  40. SpotInfoUpdateType = "spotinfo"
  41. AthenaInfoUpdateType = "athenainfo"
  42. PreemptibleType = "preemptible"
  43. APIPricingSource = "Public API"
  44. SpotPricingSource = "Spot Data Feed"
  45. ReservedInstancePricingSource = "Savings Plan, Reserved Instance, and Out-Of-Cluster"
  46. )
  47. var (
  48. // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  49. provIdRx = regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  50. usageTypeRegx = regexp.MustCompile(".*(-|^)(EBS.+)")
  51. versionRx = regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  52. )
  53. func (aws *AWS) PricingSourceStatus() map[string]*PricingSource {
  54. sources := make(map[string]*PricingSource)
  55. sps := &PricingSource{
  56. Name: SpotPricingSource,
  57. Enabled: true,
  58. }
  59. if !aws.SpotRefreshEnabled() {
  60. sps.Available = false
  61. sps.Error = "Spot instances not set up"
  62. sps.Enabled = false
  63. } else {
  64. sps.Error = ""
  65. if aws.SpotPricingError != nil {
  66. sps.Error = aws.SpotPricingError.Error()
  67. }
  68. if sps.Error != "" {
  69. sps.Available = false
  70. } else if len(aws.SpotPricingByInstanceID) > 0 {
  71. sps.Available = true
  72. } else {
  73. sps.Error = "No spot instances detected"
  74. }
  75. }
  76. sources[SpotPricingSource] = sps
  77. rps := &PricingSource{
  78. Name: ReservedInstancePricingSource,
  79. Enabled: true,
  80. }
  81. rps.Error = ""
  82. if aws.RIPricingError != nil {
  83. rps.Error = aws.RIPricingError.Error()
  84. }
  85. if rps.Error != "" {
  86. rps.Available = false
  87. } else {
  88. rps.Available = true
  89. }
  90. sources[ReservedInstancePricingSource] = rps
  91. return sources
  92. }
  93. // How often spot data is refreshed
  94. const SpotRefreshDuration = 15 * time.Minute
  95. var awsRegions = []string{
  96. "us-east-2",
  97. "us-east-1",
  98. "us-west-1",
  99. "us-west-2",
  100. "ap-east-1",
  101. "ap-south-1",
  102. "ap-northeast-3",
  103. "ap-northeast-2",
  104. "ap-southeast-1",
  105. "ap-southeast-2",
  106. "ap-northeast-1",
  107. "ap-southeast-3",
  108. "ca-central-1",
  109. "cn-north-1",
  110. "cn-northwest-1",
  111. "eu-central-1",
  112. "eu-west-1",
  113. "eu-west-2",
  114. "eu-west-3",
  115. "eu-north-1",
  116. "eu-south-1",
  117. "me-south-1",
  118. "sa-east-1",
  119. "af-south-1",
  120. "us-gov-east-1",
  121. "us-gov-west-1",
  122. }
  123. // AWS represents an Amazon Provider
  124. type AWS struct {
  125. Pricing map[string]*AWSProductTerms
  126. SpotPricingByInstanceID map[string]*spotInfo
  127. SpotPricingUpdatedAt *time.Time
  128. SpotRefreshRunning bool
  129. SpotPricingLock sync.RWMutex
  130. SpotPricingError error
  131. RIPricingByInstanceID map[string]*RIData
  132. RIPricingError error
  133. RIDataRunning bool
  134. RIDataLock sync.RWMutex
  135. SavingsPlanDataByInstanceID map[string]*SavingsPlanData
  136. SavingsPlanDataRunning bool
  137. SavingsPlanDataLock sync.RWMutex
  138. ValidPricingKeys map[string]bool
  139. Clientset clustercache.ClusterCache
  140. BaseCPUPrice string
  141. BaseRAMPrice string
  142. BaseGPUPrice string
  143. BaseSpotCPUPrice string
  144. BaseSpotRAMPrice string
  145. BaseSpotGPUPrice string
  146. SpotLabelName string
  147. SpotLabelValue string
  148. SpotDataRegion string
  149. SpotDataBucket string
  150. SpotDataPrefix string
  151. ProjectID string
  152. DownloadPricingDataLock sync.RWMutex
  153. Config *ProviderConfig
  154. serviceAccountChecks *ServiceAccountChecks
  155. clusterManagementPrice float64
  156. clusterAccountId string
  157. clusterRegion string
  158. clusterProvisioner string
  159. *CustomProvider
  160. }
  161. // AWSAccessKey holds AWS credentials and fulfils the awsV2.CredentialsProvider interface
  162. type AWSAccessKey struct {
  163. AccessKeyID string `json:"aws_access_key_id"`
  164. SecretAccessKey string `json:"aws_secret_access_key"`
  165. }
  166. // Retrieve returns a set of awsV2 credentials using the AWSAccessKey's key and secret.
  167. // This fulfils the awsV2.CredentialsProvider interface contract.
  168. func (accessKey AWSAccessKey) Retrieve(ctx context.Context) (awsSDK.Credentials, error) {
  169. return awsSDK.Credentials{
  170. AccessKeyID: accessKey.AccessKeyID,
  171. SecretAccessKey: accessKey.SecretAccessKey,
  172. }, nil
  173. }
  174. // CreateConfig creates an AWS SDK V2 Config for the credentials that it contains for the provided region
  175. func (accessKey AWSAccessKey) CreateConfig(region string) (awsSDK.Config, error) {
  176. var cfg awsSDK.Config
  177. var err error
  178. // If accessKey values have not been provided, attempt to load cfg from service key annotations
  179. if accessKey.AccessKeyID == "" && accessKey.SecretAccessKey == "" {
  180. cfg, err = config.LoadDefaultConfig(context.TODO(), config.WithRegion(region))
  181. if err != nil {
  182. return cfg, fmt.Errorf("failed to initialize AWS SDK config for region from annotation %s: %s", region, err)
  183. }
  184. } else {
  185. // The AWS SDK v2 requires an object fulfilling the CredentialsProvider interface, which cloud.AWSAccessKey does
  186. cfg, err = config.LoadDefaultConfig(context.TODO(), config.WithCredentialsProvider(accessKey), config.WithRegion(region))
  187. if err != nil {
  188. return cfg, fmt.Errorf("failed to initialize AWS SDK config for region %s: %s", region, err)
  189. }
  190. }
  191. return cfg, nil
  192. }
  193. // AWSPricing maps a k8s node to an AWS Pricing "product"
  194. type AWSPricing struct {
  195. Products map[string]*AWSProduct `json:"products"`
  196. Terms AWSPricingTerms `json:"terms"`
  197. }
  198. // AWSProduct represents a purchased SKU
  199. type AWSProduct struct {
  200. Sku string `json:"sku"`
  201. Attributes AWSProductAttributes `json:"attributes"`
  202. }
  203. // AWSProductAttributes represents metadata about the product used to map to a node.
  204. type AWSProductAttributes struct {
  205. Location string `json:"location"`
  206. InstanceType string `json:"instanceType"`
  207. Memory string `json:"memory"`
  208. Storage string `json:"storage"`
  209. VCpu string `json:"vcpu"`
  210. UsageType string `json:"usagetype"`
  211. OperatingSystem string `json:"operatingSystem"`
  212. PreInstalledSw string `json:"preInstalledSw"`
  213. InstanceFamily string `json:"instanceFamily"`
  214. CapacityStatus string `json:"capacitystatus"`
  215. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  216. }
  217. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  218. type AWSPricingTerms struct {
  219. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  220. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  221. }
  222. // AWSOfferTerm is a sku extension used to pay for the node.
  223. type AWSOfferTerm struct {
  224. Sku string `json:"sku"`
  225. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  226. }
  227. func (ot *AWSOfferTerm) String() string {
  228. var strs []string
  229. for k, rc := range ot.PriceDimensions {
  230. strs = append(strs, fmt.Sprintf("%s:%s", k, rc.String()))
  231. }
  232. return fmt.Sprintf("%s:%s", ot.Sku, strings.Join(strs, ","))
  233. }
  234. // AWSRateCode encodes data about the price of a product
  235. type AWSRateCode struct {
  236. Unit string `json:"unit"`
  237. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  238. }
  239. func (rc *AWSRateCode) String() string {
  240. return fmt.Sprintf("{unit: %s, pricePerUnit: %v", rc.Unit, rc.PricePerUnit)
  241. }
  242. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  243. type AWSCurrencyCode struct {
  244. USD string `json:"USD,omitempty"`
  245. CNY string `json:"CNY,omitempty"`
  246. }
  247. // AWSProductTerms represents the full terms of the product
  248. type AWSProductTerms struct {
  249. Sku string `json:"sku"`
  250. OnDemand *AWSOfferTerm `json:"OnDemand"`
  251. Reserved *AWSOfferTerm `json:"Reserved"`
  252. Memory string `json:"memory"`
  253. Storage string `json:"storage"`
  254. VCpu string `json:"vcpu"`
  255. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  256. PV *PV `json:"pv"`
  257. }
  258. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  259. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  260. // OnDemandRateCode is appended to an node sku
  261. const OnDemandRateCode = ".JRTCKXETXF"
  262. const OnDemandRateCodeCn = ".99YE2YK9UR"
  263. // ReservedRateCode is appended to a node sku
  264. const ReservedRateCode = ".38NPMPTW36"
  265. // HourlyRateCode is appended to a node sku
  266. const HourlyRateCode = ".6YS6EN2CT7"
  267. const HourlyRateCodeCn = ".Q7UJUT2CE6"
  268. // volTypes are used to map between AWS UsageTypes and
  269. // EBS volume types, as they would appear in K8s storage class
  270. // name and the EC2 API.
  271. var volTypes = map[string]string{
  272. "EBS:VolumeUsage.gp2": "gp2",
  273. "EBS:VolumeUsage.gp3": "gp3",
  274. "EBS:VolumeUsage": "standard",
  275. "EBS:VolumeUsage.sc1": "sc1",
  276. "EBS:VolumeP-IOPS.piops": "io1",
  277. "EBS:VolumeUsage.st1": "st1",
  278. "EBS:VolumeUsage.piops": "io1",
  279. "gp2": "EBS:VolumeUsage.gp2",
  280. "gp3": "EBS:VolumeUsage.gp3",
  281. "standard": "EBS:VolumeUsage",
  282. "sc1": "EBS:VolumeUsage.sc1",
  283. "io1": "EBS:VolumeUsage.piops",
  284. "st1": "EBS:VolumeUsage.st1",
  285. }
  286. // locationToRegion maps AWS region names (As they come from Billing)
  287. // to actual region identifiers
  288. var locationToRegion = map[string]string{
  289. "US East (Ohio)": "us-east-2",
  290. "US East (N. Virginia)": "us-east-1",
  291. "US West (N. California)": "us-west-1",
  292. "US West (Oregon)": "us-west-2",
  293. "Asia Pacific (Hong Kong)": "ap-east-1",
  294. "Asia Pacific (Mumbai)": "ap-south-1",
  295. "Asia Pacific (Osaka)": "ap-northeast-3",
  296. "Asia Pacific (Seoul)": "ap-northeast-2",
  297. "Asia Pacific (Singapore)": "ap-southeast-1",
  298. "Asia Pacific (Sydney)": "ap-southeast-2",
  299. "Asia Pacific (Tokyo)": "ap-northeast-1",
  300. "Asia Pacific (Jakarta)": "ap-southeast-3",
  301. "Canada (Central)": "ca-central-1",
  302. "China (Beijing)": "cn-north-1",
  303. "China (Ningxia)": "cn-northwest-1",
  304. "EU (Frankfurt)": "eu-central-1",
  305. "EU (Ireland)": "eu-west-1",
  306. "EU (London)": "eu-west-2",
  307. "EU (Paris)": "eu-west-3",
  308. "EU (Stockholm)": "eu-north-1",
  309. "EU (Milan)": "eu-south-1",
  310. "South America (Sao Paulo)": "sa-east-1",
  311. "Africa (Cape Town)": "af-south-1",
  312. "AWS GovCloud (US-East)": "us-gov-east-1",
  313. "AWS GovCloud (US-West)": "us-gov-west-1",
  314. }
  315. var regionToBillingRegionCode = map[string]string{
  316. "us-east-2": "USE2",
  317. "us-east-1": "",
  318. "us-west-1": "USW1",
  319. "us-west-2": "USW2",
  320. "ap-east-1": "APE1",
  321. "ap-south-1": "APS3",
  322. "ap-northeast-3": "APN3",
  323. "ap-northeast-2": "APN2",
  324. "ap-southeast-1": "APS1",
  325. "ap-southeast-2": "APS2",
  326. "ap-northeast-1": "APN1",
  327. "ap-southeast-3": "APS4",
  328. "ca-central-1": "CAN1",
  329. "cn-north-1": "",
  330. "cn-northwest-1": "",
  331. "eu-central-1": "EUC1",
  332. "eu-west-1": "EU",
  333. "eu-west-2": "EUW2",
  334. "eu-west-3": "EUW3",
  335. "eu-north-1": "EUN1",
  336. "eu-south-1": "EUS1",
  337. "sa-east-1": "SAE1",
  338. "af-south-1": "AFS1",
  339. "us-gov-east-1": "UGE1",
  340. "us-gov-west-1": "UGW1",
  341. }
  342. var loadedAWSSecret bool = false
  343. var awsSecret *AWSAccessKey = nil
  344. func (aws *AWS) GetLocalStorageQuery(window, offset time.Duration, rate bool, used bool) string {
  345. return ""
  346. }
  347. // KubeAttrConversion maps the k8s labels for region to an aws region
  348. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  349. operatingSystem = strings.ToLower(operatingSystem)
  350. region := locationToRegion[location]
  351. return region + "," + instanceType + "," + operatingSystem
  352. }
  353. // AwsSpotFeedInfo contains configuration for spot feed integration
  354. type AwsSpotFeedInfo struct {
  355. BucketName string `json:"bucketName"`
  356. Prefix string `json:"prefix"`
  357. Region string `json:"region"`
  358. AccountID string `json:"projectID"`
  359. ServiceKeyName string `json:"serviceKeyName"`
  360. ServiceKeySecret string `json:"serviceKeySecret"`
  361. SpotLabel string `json:"spotLabel"`
  362. SpotLabelValue string `json:"spotLabelValue"`
  363. }
  364. // AwsAthenaInfo contains configuration for CUR integration
  365. type AwsAthenaInfo struct {
  366. AthenaBucketName string `json:"athenaBucketName"`
  367. AthenaRegion string `json:"athenaRegion"`
  368. AthenaDatabase string `json:"athenaDatabase"`
  369. AthenaTable string `json:"athenaTable"`
  370. AthenaWorkgroup string `json:"athenaWorkgroup"`
  371. ServiceKeyName string `json:"serviceKeyName"`
  372. ServiceKeySecret string `json:"serviceKeySecret"`
  373. AccountID string `json:"projectID"`
  374. MasterPayerARN string `json:"masterPayerARN"`
  375. }
  376. // IsEmpty returns true if all fields in config are empty, false if not.
  377. func (aai *AwsAthenaInfo) IsEmpty() bool {
  378. return aai.AthenaBucketName == "" &&
  379. aai.AthenaRegion == "" &&
  380. aai.AthenaDatabase == "" &&
  381. aai.AthenaTable == "" &&
  382. aai.AthenaWorkgroup == "" &&
  383. aai.ServiceKeyName == "" &&
  384. aai.ServiceKeySecret == "" &&
  385. aai.AccountID == "" &&
  386. aai.MasterPayerARN == ""
  387. }
  388. // CreateConfig creates an AWS SDK V2 Config for the credentials that it contains
  389. func (aai *AwsAthenaInfo) CreateConfig() (awsSDK.Config, error) {
  390. keyProvider := AWSAccessKey{AccessKeyID: aai.ServiceKeyName, SecretAccessKey: aai.ServiceKeySecret}
  391. cfg, err := keyProvider.CreateConfig(aai.AthenaRegion)
  392. if err != nil {
  393. return cfg, err
  394. }
  395. if aai.MasterPayerARN != "" {
  396. // Create the credentials from AssumeRoleProvider to assume the role
  397. // referenced by the roleARN.
  398. stsSvc := sts.NewFromConfig(cfg)
  399. creds := stscreds.NewAssumeRoleProvider(stsSvc, aai.MasterPayerARN)
  400. cfg.Credentials = awsSDK.NewCredentialsCache(creds)
  401. }
  402. return cfg, nil
  403. }
  404. func (aws *AWS) GetManagementPlatform() (string, error) {
  405. nodes := aws.Clientset.GetAllNodes()
  406. if len(nodes) > 0 {
  407. n := nodes[0]
  408. version := n.Status.NodeInfo.KubeletVersion
  409. if strings.Contains(version, "eks") {
  410. return "eks", nil
  411. }
  412. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  413. return "kops", nil
  414. }
  415. }
  416. return "", nil
  417. }
  418. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  419. c, err := aws.Config.GetCustomPricingData()
  420. if err != nil {
  421. return nil, err
  422. }
  423. if c.Discount == "" {
  424. c.Discount = "0%"
  425. }
  426. if c.NegotiatedDiscount == "" {
  427. c.NegotiatedDiscount = "0%"
  428. }
  429. if c.ShareTenancyCosts == "" {
  430. c.ShareTenancyCosts = defaultShareTenancyCost
  431. }
  432. return c, nil
  433. }
  434. // GetAWSAccessKey generate an AWSAccessKey object from the config
  435. func (aws *AWS) GetAWSAccessKey() (*AWSAccessKey, error) {
  436. config, err := aws.GetConfig()
  437. if err != nil {
  438. return nil, fmt.Errorf("could not retrieve AwsAthenaInfo %s", err)
  439. }
  440. err = aws.ConfigureAuthWith(config)
  441. if err != nil {
  442. return nil, fmt.Errorf("error configuring Cloud Provider %s", err)
  443. }
  444. //Look for service key values in env if not present in config
  445. if config.ServiceKeyName == "" {
  446. config.ServiceKeyName = env.GetAWSAccessKeyID()
  447. }
  448. if config.ServiceKeySecret == "" {
  449. config.ServiceKeySecret = env.GetAWSAccessKeySecret()
  450. }
  451. if config.ServiceKeyName == "" && config.ServiceKeySecret == "" {
  452. log.DedupedInfof(1, "missing service key values for AWS cloud integration attempting to use service account integration")
  453. }
  454. return &AWSAccessKey{AccessKeyID: config.ServiceKeyName, SecretAccessKey: config.ServiceKeySecret}, nil
  455. }
  456. // GetAWSAthenaInfo generate an AWSAthenaInfo object from the config
  457. func (aws *AWS) GetAWSAthenaInfo() (*AwsAthenaInfo, error) {
  458. config, err := aws.GetConfig()
  459. if err != nil {
  460. return nil, fmt.Errorf("could not retrieve AwsAthenaInfo %s", err)
  461. }
  462. aak, err := aws.GetAWSAccessKey()
  463. if err != nil {
  464. return nil, err
  465. }
  466. return &AwsAthenaInfo{
  467. AthenaBucketName: config.AthenaBucketName,
  468. AthenaRegion: config.AthenaRegion,
  469. AthenaDatabase: config.AthenaDatabase,
  470. AthenaTable: config.AthenaTable,
  471. AthenaWorkgroup: config.AthenaWorkgroup,
  472. ServiceKeyName: aak.AccessKeyID,
  473. ServiceKeySecret: aak.SecretAccessKey,
  474. AccountID: config.AthenaProjectID,
  475. MasterPayerARN: config.MasterPayerARN,
  476. }, nil
  477. }
  478. func (aws *AWS) UpdateConfigFromConfigMap(cm map[string]string) (*CustomPricing, error) {
  479. return aws.Config.UpdateFromMap(cm)
  480. }
  481. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  482. return aws.Config.Update(func(c *CustomPricing) error {
  483. if updateType == SpotInfoUpdateType {
  484. asfi := AwsSpotFeedInfo{}
  485. err := json.NewDecoder(r).Decode(&asfi)
  486. if err != nil {
  487. return err
  488. }
  489. c.ServiceKeyName = asfi.ServiceKeyName
  490. if asfi.ServiceKeySecret != "" {
  491. c.ServiceKeySecret = asfi.ServiceKeySecret
  492. }
  493. c.SpotDataPrefix = asfi.Prefix
  494. c.SpotDataBucket = asfi.BucketName
  495. c.ProjectID = asfi.AccountID
  496. c.SpotDataRegion = asfi.Region
  497. c.SpotLabel = asfi.SpotLabel
  498. c.SpotLabelValue = asfi.SpotLabelValue
  499. } else if updateType == AthenaInfoUpdateType {
  500. aai := AwsAthenaInfo{}
  501. err := json.NewDecoder(r).Decode(&aai)
  502. if err != nil {
  503. return err
  504. }
  505. c.AthenaBucketName = aai.AthenaBucketName
  506. c.AthenaRegion = aai.AthenaRegion
  507. c.AthenaDatabase = aai.AthenaDatabase
  508. c.AthenaTable = aai.AthenaTable
  509. c.AthenaWorkgroup = aai.AthenaWorkgroup
  510. c.ServiceKeyName = aai.ServiceKeyName
  511. if aai.ServiceKeySecret != "" {
  512. c.ServiceKeySecret = aai.ServiceKeySecret
  513. }
  514. if aai.MasterPayerARN != "" {
  515. c.MasterPayerARN = aai.MasterPayerARN
  516. }
  517. c.AthenaProjectID = aai.AccountID
  518. } else {
  519. a := make(map[string]interface{})
  520. err := json.NewDecoder(r).Decode(&a)
  521. if err != nil {
  522. return err
  523. }
  524. for k, v := range a {
  525. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  526. vstr, ok := v.(string)
  527. if ok {
  528. err := SetCustomPricingField(c, kUpper, vstr)
  529. if err != nil {
  530. return err
  531. }
  532. } else {
  533. return fmt.Errorf("type error while updating config for %s", kUpper)
  534. }
  535. }
  536. }
  537. if env.IsRemoteEnabled() {
  538. err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
  539. if err != nil {
  540. return err
  541. }
  542. }
  543. return nil
  544. })
  545. }
  546. type awsKey struct {
  547. SpotLabelName string
  548. SpotLabelValue string
  549. Labels map[string]string
  550. ProviderID string
  551. }
  552. func (k *awsKey) GPUCount() int {
  553. return 0
  554. }
  555. func (k *awsKey) GPUType() string {
  556. return ""
  557. }
  558. func (k *awsKey) ID() string {
  559. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  560. if matchNum == 2 {
  561. return group
  562. }
  563. }
  564. log.Warnf("Could not find instance ID in \"%s\"", k.ProviderID)
  565. return ""
  566. }
  567. func (k *awsKey) Features() string {
  568. instanceType, _ := util.GetInstanceType(k.Labels)
  569. operatingSystem, _ := util.GetOperatingSystem(k.Labels)
  570. region, _ := util.GetRegion(k.Labels)
  571. key := region + "," + instanceType + "," + operatingSystem
  572. usageType := PreemptibleType
  573. spotKey := key + "," + usageType
  574. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  575. return spotKey
  576. }
  577. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  578. return spotKey
  579. }
  580. return key
  581. }
  582. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  583. pricing, ok := aws.Pricing[pvk.Features()]
  584. if !ok {
  585. log.Debugf("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  586. return &PV{}, nil
  587. }
  588. return pricing.PV, nil
  589. }
  590. type awsPVKey struct {
  591. Labels map[string]string
  592. StorageClassParameters map[string]string
  593. StorageClassName string
  594. Name string
  595. DefaultRegion string
  596. ProviderID string
  597. }
  598. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  599. providerID := ""
  600. if pv.Spec.AWSElasticBlockStore != nil {
  601. providerID = pv.Spec.AWSElasticBlockStore.VolumeID
  602. } else if pv.Spec.CSI != nil {
  603. providerID = pv.Spec.CSI.VolumeHandle
  604. }
  605. return &awsPVKey{
  606. Labels: pv.Labels,
  607. StorageClassName: pv.Spec.StorageClassName,
  608. StorageClassParameters: parameters,
  609. Name: pv.Name,
  610. DefaultRegion: defaultRegion,
  611. ProviderID: providerID,
  612. }
  613. }
  614. func (key *awsPVKey) ID() string {
  615. return key.ProviderID
  616. }
  617. func (key *awsPVKey) GetStorageClass() string {
  618. return key.StorageClassName
  619. }
  620. func (key *awsPVKey) Features() string {
  621. storageClass := key.StorageClassParameters["type"]
  622. if storageClass == "standard" {
  623. storageClass = "gp2"
  624. }
  625. // Storage class names are generally EBS volume types (gp2)
  626. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  627. // Converts between the 2
  628. region, ok := util.GetRegion(key.Labels)
  629. if !ok {
  630. region = key.DefaultRegion
  631. }
  632. class, ok := volTypes[storageClass]
  633. if !ok {
  634. log.Debugf("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  635. }
  636. return region + "," + class
  637. }
  638. // GetKey maps node labels to information needed to retrieve pricing data
  639. func (aws *AWS) GetKey(labels map[string]string, n *v1.Node) Key {
  640. return &awsKey{
  641. SpotLabelName: aws.SpotLabelName,
  642. SpotLabelValue: aws.SpotLabelValue,
  643. Labels: labels,
  644. ProviderID: labels["providerID"],
  645. }
  646. }
  647. func (aws *AWS) isPreemptible(key string) bool {
  648. s := strings.Split(key, ",")
  649. if len(s) == 4 && s[3] == PreemptibleType {
  650. return true
  651. }
  652. return false
  653. }
  654. func (aws *AWS) ClusterManagementPricing() (string, float64, error) {
  655. return aws.clusterProvisioner, aws.clusterManagementPrice, nil
  656. }
  657. // Use the pricing data from the current region. Fall back to using all region data if needed.
  658. func (aws *AWS) getRegionPricing(nodeList []*v1.Node) (*http.Response, string, error) {
  659. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/"
  660. region := ""
  661. multiregion := false
  662. for _, n := range nodeList {
  663. labels := n.GetLabels()
  664. currentNodeRegion := ""
  665. if r, ok := util.GetRegion(labels); ok {
  666. currentNodeRegion = r
  667. // Switch to Chinese endpoint for regions with the Chinese prefix
  668. if strings.HasPrefix(currentNodeRegion, "cn-") {
  669. pricingURL = "https://pricing.cn-north-1.amazonaws.com.cn/offers/v1.0/cn/AmazonEC2/current/"
  670. }
  671. } else {
  672. multiregion = true // We weren't able to detect the node's region, so pull all data.
  673. break
  674. }
  675. if region == "" { // We haven't set a region yet
  676. region = currentNodeRegion
  677. } else if region != "" && currentNodeRegion != region { // If two nodes have different regions here, we'll need to fetch all pricing data.
  678. multiregion = true
  679. break
  680. }
  681. }
  682. // Chinese multiregion endpoint only contains data for Chinese regions and Chinese regions are excluded from other endpoint
  683. if region != "" && !multiregion {
  684. pricingURL += region + "/"
  685. }
  686. pricingURL += "index.json"
  687. if env.GetAWSPricingURL() != "" { // Allow override of pricing URL
  688. pricingURL = env.GetAWSPricingURL()
  689. }
  690. log.Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  691. resp, err := http.Get(pricingURL)
  692. if err != nil {
  693. log.Errorf("Bogus fetch of \"%s\": %v", pricingURL, err)
  694. return nil, pricingURL, err
  695. }
  696. return resp, pricingURL, err
  697. }
  698. // SpotRefreshEnabled determines whether the required configs to run the spot feed query have been set up
  699. func (aws *AWS) SpotRefreshEnabled() bool {
  700. // Need a valid value for at least one of these fields to consider spot pricing as enabled
  701. return len(aws.SpotDataBucket) != 0 || len(aws.SpotDataRegion) != 0 || len(aws.ProjectID) != 0
  702. }
  703. // DownloadPricingData fetches data from the AWS Pricing API
  704. func (aws *AWS) DownloadPricingData() error {
  705. aws.DownloadPricingDataLock.Lock()
  706. defer aws.DownloadPricingDataLock.Unlock()
  707. c, err := aws.Config.GetCustomPricingData()
  708. if err != nil {
  709. log.Errorf("Error downloading default pricing data: %s", err.Error())
  710. }
  711. aws.BaseCPUPrice = c.CPU
  712. aws.BaseRAMPrice = c.RAM
  713. aws.BaseGPUPrice = c.GPU
  714. aws.BaseSpotCPUPrice = c.SpotCPU
  715. aws.BaseSpotRAMPrice = c.SpotRAM
  716. aws.BaseSpotGPUPrice = c.SpotGPU
  717. aws.SpotLabelName = c.SpotLabel
  718. aws.SpotLabelValue = c.SpotLabelValue
  719. aws.SpotDataBucket = c.SpotDataBucket
  720. aws.SpotDataPrefix = c.SpotDataPrefix
  721. aws.ProjectID = c.ProjectID
  722. aws.SpotDataRegion = c.SpotDataRegion
  723. aws.ConfigureAuthWith(c) // load aws authentication from configuration or secret
  724. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  725. log.Warnf("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  726. }
  727. nodeList := aws.Clientset.GetAllNodes()
  728. inputkeys := make(map[string]bool)
  729. for _, n := range nodeList {
  730. if _, ok := n.Labels["eks.amazonaws.com/nodegroup"]; ok {
  731. aws.clusterManagementPrice = 0.10
  732. aws.clusterProvisioner = "EKS"
  733. } else if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  734. aws.clusterProvisioner = "KOPS"
  735. }
  736. labels := n.GetObjectMeta().GetLabels()
  737. key := aws.GetKey(labels, n)
  738. inputkeys[key.Features()] = true
  739. }
  740. pvList := aws.Clientset.GetAllPersistentVolumes()
  741. storageClasses := aws.Clientset.GetAllStorageClasses()
  742. storageClassMap := make(map[string]map[string]string)
  743. for _, storageClass := range storageClasses {
  744. params := storageClass.Parameters
  745. storageClassMap[storageClass.ObjectMeta.Name] = params
  746. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  747. storageClassMap["default"] = params
  748. storageClassMap[""] = params
  749. }
  750. }
  751. pvkeys := make(map[string]PVKey)
  752. for _, pv := range pvList {
  753. params, ok := storageClassMap[pv.Spec.StorageClassName]
  754. if !ok {
  755. log.Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  756. continue
  757. }
  758. key := aws.GetPVKey(pv, params, "")
  759. pvkeys[key.Features()] = key
  760. }
  761. // RIDataRunning establishes the existence of the goroutine. Since it's possible we
  762. // run multiple downloads, we don't want to create multiple go routines if one already exists
  763. if !aws.RIDataRunning {
  764. err = aws.GetReservationDataFromAthena() // Block until one run has completed.
  765. if err != nil {
  766. log.Errorf("Failed to lookup reserved instance data: %s", err.Error())
  767. } else { // If we make one successful run, check on new reservation data every hour
  768. go func() {
  769. defer errs.HandlePanic()
  770. aws.RIDataRunning = true
  771. for {
  772. log.Infof("Reserved Instance watcher running... next update in 1h")
  773. time.Sleep(time.Hour)
  774. err := aws.GetReservationDataFromAthena()
  775. if err != nil {
  776. log.Infof("Error updating RI data: %s", err.Error())
  777. }
  778. }
  779. }()
  780. }
  781. }
  782. if !aws.SavingsPlanDataRunning {
  783. err = aws.GetSavingsPlanDataFromAthena()
  784. if err != nil {
  785. log.Errorf("Failed to lookup savings plan data: %s", err.Error())
  786. } else {
  787. go func() {
  788. defer errs.HandlePanic()
  789. aws.SavingsPlanDataRunning = true
  790. for {
  791. log.Infof("Savings Plan watcher running... next update in 1h")
  792. time.Sleep(time.Hour)
  793. err := aws.GetSavingsPlanDataFromAthena()
  794. if err != nil {
  795. log.Infof("Error updating Savings Plan data: %s", err.Error())
  796. }
  797. }
  798. }()
  799. }
  800. }
  801. aws.Pricing = make(map[string]*AWSProductTerms)
  802. aws.ValidPricingKeys = make(map[string]bool)
  803. skusToKeys := make(map[string]string)
  804. resp, pricingURL, err := aws.getRegionPricing(nodeList)
  805. if err != nil {
  806. return err
  807. }
  808. dec := json.NewDecoder(resp.Body)
  809. for {
  810. t, err := dec.Token()
  811. if err == io.EOF {
  812. log.Infof("done loading \"%s\"\n", pricingURL)
  813. break
  814. } else if err != nil {
  815. log.Errorf("error parsing response json %v", resp.Body)
  816. break
  817. }
  818. if t == "products" {
  819. _, err := dec.Token() // this should parse the opening "{""
  820. if err != nil {
  821. return err
  822. }
  823. for dec.More() {
  824. _, err := dec.Token() // the sku token
  825. if err != nil {
  826. return err
  827. }
  828. product := &AWSProduct{}
  829. err = dec.Decode(&product)
  830. if err != nil {
  831. log.Errorf("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  832. break
  833. }
  834. if product.Attributes.PreInstalledSw == "NA" &&
  835. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) &&
  836. product.Attributes.CapacityStatus == "Used" {
  837. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  838. spotKey := key + ",preemptible"
  839. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  840. productTerms := &AWSProductTerms{
  841. Sku: product.Sku,
  842. Memory: product.Attributes.Memory,
  843. Storage: product.Attributes.Storage,
  844. VCpu: product.Attributes.VCpu,
  845. GPU: product.Attributes.GPU,
  846. }
  847. aws.Pricing[key] = productTerms
  848. aws.Pricing[spotKey] = productTerms
  849. skusToKeys[product.Sku] = key
  850. }
  851. aws.ValidPricingKeys[key] = true
  852. aws.ValidPricingKeys[spotKey] = true
  853. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  854. // UsageTypes may be prefixed with a region code - we're removing this when using
  855. // volTypes to keep lookups generic
  856. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  857. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  858. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  859. spotKey := key + ",preemptible"
  860. pv := &PV{
  861. Class: volTypes[usageTypeNoRegion],
  862. Region: locationToRegion[product.Attributes.Location],
  863. }
  864. productTerms := &AWSProductTerms{
  865. Sku: product.Sku,
  866. PV: pv,
  867. }
  868. aws.Pricing[key] = productTerms
  869. aws.Pricing[spotKey] = productTerms
  870. skusToKeys[product.Sku] = key
  871. aws.ValidPricingKeys[key] = true
  872. aws.ValidPricingKeys[spotKey] = true
  873. }
  874. }
  875. }
  876. if t == "terms" {
  877. _, err := dec.Token() // this should parse the opening "{""
  878. if err != nil {
  879. return err
  880. }
  881. termType, err := dec.Token()
  882. if err != nil {
  883. return err
  884. }
  885. if termType == "OnDemand" {
  886. _, err := dec.Token()
  887. if err != nil { // again, should parse an opening "{"
  888. return err
  889. }
  890. for dec.More() {
  891. sku, err := dec.Token()
  892. if err != nil {
  893. return err
  894. }
  895. _, err = dec.Token() // another opening "{"
  896. if err != nil {
  897. return err
  898. }
  899. skuOnDemand, err := dec.Token()
  900. if err != nil {
  901. return err
  902. }
  903. offerTerm := &AWSOfferTerm{}
  904. err = dec.Decode(&offerTerm)
  905. if err != nil {
  906. log.Errorf("Error decoding AWS Offer Term: " + err.Error())
  907. }
  908. key, ok := skusToKeys[sku.(string)]
  909. spotKey := key + ",preemptible"
  910. if ok {
  911. aws.Pricing[key].OnDemand = offerTerm
  912. aws.Pricing[spotKey].OnDemand = offerTerm
  913. var cost string
  914. if sku.(string)+OnDemandRateCode == skuOnDemand {
  915. cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  916. } else if sku.(string)+OnDemandRateCodeCn == skuOnDemand {
  917. cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCodeCn+HourlyRateCodeCn].PricePerUnit.CNY
  918. }
  919. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  920. // If the specific UsageType is the per IO cost used on io1 volumes
  921. // we need to add the per IO cost to the io1 PV cost
  922. // Add the per IO cost to the PV object for the io1 volume type
  923. aws.Pricing[key].PV.CostPerIO = cost
  924. } else if strings.Contains(key, "EBS:Volume") {
  925. // If volume, we need to get hourly cost and add it to the PV object
  926. costFloat, _ := strconv.ParseFloat(cost, 64)
  927. hourlyPrice := costFloat / 730
  928. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  929. }
  930. }
  931. _, err = dec.Token()
  932. if err != nil {
  933. return err
  934. }
  935. }
  936. _, err = dec.Token()
  937. if err != nil {
  938. return err
  939. }
  940. }
  941. }
  942. }
  943. log.Infof("Finished downloading \"%s\"", pricingURL)
  944. if !aws.SpotRefreshEnabled() {
  945. return nil
  946. }
  947. // Always run spot pricing refresh when performing download
  948. aws.refreshSpotPricing(true)
  949. // Only start a single refresh goroutine
  950. if !aws.SpotRefreshRunning {
  951. aws.SpotRefreshRunning = true
  952. go func() {
  953. defer errs.HandlePanic()
  954. for {
  955. log.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
  956. time.Sleep(SpotRefreshDuration)
  957. // Reoccurring refresh checks update times
  958. aws.refreshSpotPricing(false)
  959. }
  960. }()
  961. }
  962. return nil
  963. }
  964. func (aws *AWS) refreshSpotPricing(force bool) {
  965. aws.SpotPricingLock.Lock()
  966. defer aws.SpotPricingLock.Unlock()
  967. now := time.Now().UTC()
  968. updateTime := now.Add(-SpotRefreshDuration)
  969. // Return if there was an update time set and an hour hasn't elapsed
  970. if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
  971. return
  972. }
  973. sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion)
  974. if err != nil {
  975. log.Warnf("Skipping AWS spot data download: %s", err.Error())
  976. aws.SpotPricingError = err
  977. return
  978. }
  979. aws.SpotPricingError = nil
  980. // update time last updated
  981. aws.SpotPricingUpdatedAt = &now
  982. aws.SpotPricingByInstanceID = sp
  983. }
  984. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  985. func (aws *AWS) NetworkPricing() (*Network, error) {
  986. cpricing, err := aws.Config.GetCustomPricingData()
  987. if err != nil {
  988. return nil, err
  989. }
  990. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  991. if err != nil {
  992. return nil, err
  993. }
  994. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  995. if err != nil {
  996. return nil, err
  997. }
  998. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  999. if err != nil {
  1000. return nil, err
  1001. }
  1002. return &Network{
  1003. ZoneNetworkEgressCost: znec,
  1004. RegionNetworkEgressCost: rnec,
  1005. InternetNetworkEgressCost: inec,
  1006. }, nil
  1007. }
  1008. func (aws *AWS) LoadBalancerPricing() (*LoadBalancer, error) {
  1009. fffrc := 0.025
  1010. afrc := 0.010
  1011. lbidc := 0.008
  1012. numForwardingRules := 1.0
  1013. dataIngressGB := 0.0
  1014. var totalCost float64
  1015. if numForwardingRules < 5 {
  1016. totalCost = fffrc*numForwardingRules + lbidc*dataIngressGB
  1017. } else {
  1018. totalCost = fffrc*5 + afrc*(numForwardingRules-5) + lbidc*dataIngressGB
  1019. }
  1020. return &LoadBalancer{
  1021. Cost: totalCost,
  1022. }, nil
  1023. }
  1024. // AllNodePricing returns all the billing data fetched.
  1025. func (aws *AWS) AllNodePricing() (interface{}, error) {
  1026. aws.DownloadPricingDataLock.RLock()
  1027. defer aws.DownloadPricingDataLock.RUnlock()
  1028. return aws.Pricing, nil
  1029. }
  1030. func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
  1031. aws.SpotPricingLock.RLock()
  1032. defer aws.SpotPricingLock.RUnlock()
  1033. info, ok := aws.SpotPricingByInstanceID[instanceID]
  1034. return info, ok
  1035. }
  1036. func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
  1037. aws.RIDataLock.RLock()
  1038. defer aws.RIDataLock.RUnlock()
  1039. data, ok := aws.RIPricingByInstanceID[instanceID]
  1040. return data, ok
  1041. }
  1042. func (aws *AWS) savingsPlanPricing(instanceID string) (*SavingsPlanData, bool) {
  1043. aws.SavingsPlanDataLock.RLock()
  1044. defer aws.SavingsPlanDataLock.RUnlock()
  1045. data, ok := aws.SavingsPlanDataByInstanceID[instanceID]
  1046. return data, ok
  1047. }
  1048. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  1049. key := k.Features()
  1050. if spotInfo, ok := aws.spotPricing(k.ID()); ok {
  1051. var spotcost string
  1052. log.DedupedInfof(5, "Looking up spot data from feed for node %s", k.ID())
  1053. arr := strings.Split(spotInfo.Charge, " ")
  1054. if len(arr) == 2 {
  1055. spotcost = arr[0]
  1056. } else {
  1057. log.Infof("Spot data for node %s is missing", k.ID())
  1058. }
  1059. return &Node{
  1060. Cost: spotcost,
  1061. VCPU: terms.VCpu,
  1062. RAM: terms.Memory,
  1063. GPU: terms.GPU,
  1064. Storage: terms.Storage,
  1065. BaseCPUPrice: aws.BaseCPUPrice,
  1066. BaseRAMPrice: aws.BaseRAMPrice,
  1067. BaseGPUPrice: aws.BaseGPUPrice,
  1068. UsageType: PreemptibleType,
  1069. }, nil
  1070. } else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
  1071. log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
  1072. return &Node{
  1073. VCPU: terms.VCpu,
  1074. VCPUCost: aws.BaseSpotCPUPrice,
  1075. RAM: terms.Memory,
  1076. GPU: terms.GPU,
  1077. Storage: terms.Storage,
  1078. BaseCPUPrice: aws.BaseCPUPrice,
  1079. BaseRAMPrice: aws.BaseRAMPrice,
  1080. BaseGPUPrice: aws.BaseGPUPrice,
  1081. UsageType: PreemptibleType,
  1082. }, nil
  1083. } else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
  1084. strCost := fmt.Sprintf("%f", sp.EffectiveCost)
  1085. return &Node{
  1086. Cost: strCost,
  1087. VCPU: terms.VCpu,
  1088. RAM: terms.Memory,
  1089. GPU: terms.GPU,
  1090. Storage: terms.Storage,
  1091. BaseCPUPrice: aws.BaseCPUPrice,
  1092. BaseRAMPrice: aws.BaseRAMPrice,
  1093. BaseGPUPrice: aws.BaseGPUPrice,
  1094. UsageType: usageType,
  1095. }, nil
  1096. } else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
  1097. strCost := fmt.Sprintf("%f", ri.EffectiveCost)
  1098. return &Node{
  1099. Cost: strCost,
  1100. VCPU: terms.VCpu,
  1101. RAM: terms.Memory,
  1102. GPU: terms.GPU,
  1103. Storage: terms.Storage,
  1104. BaseCPUPrice: aws.BaseCPUPrice,
  1105. BaseRAMPrice: aws.BaseRAMPrice,
  1106. BaseGPUPrice: aws.BaseGPUPrice,
  1107. UsageType: usageType,
  1108. }, nil
  1109. }
  1110. var cost string
  1111. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  1112. if ok {
  1113. cost = c.PricePerUnit.USD
  1114. } else {
  1115. // Check for Chinese pricing before throwing error
  1116. c, ok = terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCodeCn+HourlyRateCodeCn]
  1117. if ok {
  1118. cost = c.PricePerUnit.CNY
  1119. } else {
  1120. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  1121. }
  1122. }
  1123. return &Node{
  1124. Cost: cost,
  1125. VCPU: terms.VCpu,
  1126. RAM: terms.Memory,
  1127. GPU: terms.GPU,
  1128. Storage: terms.Storage,
  1129. BaseCPUPrice: aws.BaseCPUPrice,
  1130. BaseRAMPrice: aws.BaseRAMPrice,
  1131. BaseGPUPrice: aws.BaseGPUPrice,
  1132. UsageType: usageType,
  1133. }, nil
  1134. }
  1135. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  1136. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  1137. aws.DownloadPricingDataLock.RLock()
  1138. defer aws.DownloadPricingDataLock.RUnlock()
  1139. key := k.Features()
  1140. usageType := "ondemand"
  1141. if aws.isPreemptible(key) {
  1142. usageType = PreemptibleType
  1143. }
  1144. terms, ok := aws.Pricing[key]
  1145. if ok {
  1146. return aws.createNode(terms, usageType, k)
  1147. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  1148. aws.DownloadPricingDataLock.RUnlock()
  1149. err := aws.DownloadPricingData()
  1150. aws.DownloadPricingDataLock.RLock()
  1151. if err != nil {
  1152. return &Node{
  1153. Cost: aws.BaseCPUPrice,
  1154. BaseCPUPrice: aws.BaseCPUPrice,
  1155. BaseRAMPrice: aws.BaseRAMPrice,
  1156. BaseGPUPrice: aws.BaseGPUPrice,
  1157. UsageType: usageType,
  1158. UsesBaseCPUPrice: true,
  1159. }, err
  1160. }
  1161. terms, termsOk := aws.Pricing[key]
  1162. if !termsOk {
  1163. return &Node{
  1164. Cost: aws.BaseCPUPrice,
  1165. BaseCPUPrice: aws.BaseCPUPrice,
  1166. BaseRAMPrice: aws.BaseRAMPrice,
  1167. BaseGPUPrice: aws.BaseGPUPrice,
  1168. UsageType: usageType,
  1169. UsesBaseCPUPrice: true,
  1170. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  1171. }
  1172. return aws.createNode(terms, usageType, k)
  1173. } else { // Fall back to base pricing if we can't find the key. Base pricing is handled at the costmodel level.
  1174. return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
  1175. }
  1176. }
  1177. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  1178. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  1179. defaultClusterName := "AWS Cluster #1"
  1180. c, err := awsProvider.GetConfig()
  1181. if err != nil {
  1182. return nil, err
  1183. }
  1184. remoteEnabled := env.IsRemoteEnabled()
  1185. makeStructure := func(clusterName string) (map[string]string, error) {
  1186. m := make(map[string]string)
  1187. m["name"] = clusterName
  1188. m["provider"] = kubecost.AWSProvider
  1189. m["account"] = c.AthenaProjectID // this value requires configuration but is unavailable else where
  1190. m["region"] = awsProvider.clusterRegion
  1191. m["id"] = env.GetClusterID()
  1192. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  1193. m["provisioner"] = awsProvider.clusterProvisioner
  1194. return m, nil
  1195. }
  1196. if c.ClusterName != "" {
  1197. return makeStructure(c.ClusterName)
  1198. }
  1199. maybeClusterId := env.GetAWSClusterID()
  1200. if len(maybeClusterId) != 0 {
  1201. log.Infof("Returning \"%s\" as ClusterName", maybeClusterId)
  1202. return makeStructure(maybeClusterId)
  1203. }
  1204. log.Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", env.AWSClusterIDEnvVar)
  1205. return makeStructure(defaultClusterName)
  1206. }
  1207. // updates the authentication to the latest values (via config or secret)
  1208. func (aws *AWS) ConfigureAuth() error {
  1209. c, err := aws.Config.GetCustomPricingData()
  1210. if err != nil {
  1211. log.Errorf("Error downloading default pricing data: %s", err.Error())
  1212. }
  1213. return aws.ConfigureAuthWith(c)
  1214. }
  1215. // updates the authentication to the latest values (via config or secret)
  1216. func (aws *AWS) ConfigureAuthWith(config *CustomPricing) error {
  1217. accessKeyID, accessKeySecret := aws.getAWSAuth(false, config)
  1218. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1219. err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
  1220. if err != nil {
  1221. return err
  1222. }
  1223. err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
  1224. if err != nil {
  1225. return err
  1226. }
  1227. }
  1228. return nil
  1229. }
  1230. // Gets the aws key id and secret
  1231. func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
  1232. // 1. Check config values first (set from frontend UI)
  1233. if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
  1234. aws.serviceAccountChecks.set("hasKey", &ServiceAccountCheck{
  1235. Message: "AWS ServiceKey exists",
  1236. Status: true,
  1237. })
  1238. return cp.ServiceKeyName, cp.ServiceKeySecret
  1239. }
  1240. // 2. Check for secret
  1241. s, _ := aws.loadAWSAuthSecret(forceReload)
  1242. if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
  1243. aws.serviceAccountChecks.set("hasKey", &ServiceAccountCheck{
  1244. Message: "AWS ServiceKey exists",
  1245. Status: true,
  1246. })
  1247. return s.AccessKeyID, s.SecretAccessKey
  1248. }
  1249. // 3. Fall back to env vars
  1250. if env.GetAWSAccessKeyID() == "" || env.GetAWSAccessKeyID() == "" {
  1251. aws.serviceAccountChecks.set("hasKey", &ServiceAccountCheck{
  1252. Message: "AWS ServiceKey exists",
  1253. Status: false,
  1254. })
  1255. } else {
  1256. aws.serviceAccountChecks.set("hasKey", &ServiceAccountCheck{
  1257. Message: "AWS ServiceKey exists",
  1258. Status: true,
  1259. })
  1260. }
  1261. return env.GetAWSAccessKeyID(), env.GetAWSAccessKeySecret()
  1262. }
  1263. // Load once and cache the result (even on failure). This is an install time secret, so
  1264. // we don't expect the secret to change. If it does, however, we can force reload using
  1265. // the input parameter.
  1266. func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
  1267. if !force && loadedAWSSecret {
  1268. return awsSecret, nil
  1269. }
  1270. loadedAWSSecret = true
  1271. exists, err := fileutil.FileExists(authSecretPath)
  1272. if !exists || err != nil {
  1273. return nil, fmt.Errorf("Failed to locate service account file: %s", authSecretPath)
  1274. }
  1275. result, err := os.ReadFile(authSecretPath)
  1276. if err != nil {
  1277. return nil, err
  1278. }
  1279. var ak AWSAccessKey
  1280. err = json.Unmarshal(result, &ak)
  1281. if err != nil {
  1282. return nil, err
  1283. }
  1284. awsSecret = &ak
  1285. return awsSecret, nil
  1286. }
  1287. func (aws *AWS) getAddressesForRegion(ctx context.Context, region string) (*ec2.DescribeAddressesOutput, error) {
  1288. aak, err := aws.GetAWSAccessKey()
  1289. if err != nil {
  1290. return nil, err
  1291. }
  1292. cfg, err := aak.CreateConfig(region)
  1293. if err != nil {
  1294. return nil, err
  1295. }
  1296. cli := ec2.NewFromConfig(cfg)
  1297. return cli.DescribeAddresses(ctx, &ec2.DescribeAddressesInput{})
  1298. }
  1299. // GetAddresses retrieves EC2 addresses
  1300. func (aws *AWS) GetAddresses() ([]byte, error) {
  1301. aws.ConfigureAuth() // load authentication data into env vars
  1302. addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
  1303. errorCh := make(chan error, len(awsRegions))
  1304. var wg sync.WaitGroup
  1305. wg.Add(len(awsRegions))
  1306. // Get volumes from each AWS region
  1307. for _, r := range awsRegions {
  1308. // Fetch IP address response and send results and errors to their
  1309. // respective channels
  1310. go func(region string) {
  1311. defer wg.Done()
  1312. defer errs.HandlePanic()
  1313. // Query for first page of volume results
  1314. resp, err := aws.getAddressesForRegion(context.TODO(), region)
  1315. if err != nil {
  1316. errorCh <- err
  1317. return
  1318. }
  1319. addressCh <- resp
  1320. }(r)
  1321. }
  1322. // Close the result channels after everything has been sent
  1323. go func() {
  1324. defer errs.HandlePanic()
  1325. wg.Wait()
  1326. close(errorCh)
  1327. close(addressCh)
  1328. }()
  1329. var addresses []*ec2Types.Address
  1330. for adds := range addressCh {
  1331. for _, add := range adds.Addresses {
  1332. a := add // duplicate to avoid pointer to iterator
  1333. addresses = append(addresses, &a)
  1334. }
  1335. }
  1336. var errs []error
  1337. for err := range errorCh {
  1338. log.DedupedWarningf(5, "unable to get addresses: %s", err)
  1339. errs = append(errs, err)
  1340. }
  1341. // Return error if no addresses are returned
  1342. if len(errs) > 0 && len(addresses) == 0 {
  1343. return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errs), errs)
  1344. }
  1345. // Format the response this way to match the JSON-encoded formatting of a single response
  1346. // from DescribeAddresss, so that consumers can always expect AWS disk responses to have
  1347. // a "Addresss" key at the top level.
  1348. return json.Marshal(map[string][]*ec2Types.Address{
  1349. "Addresses": addresses,
  1350. })
  1351. }
  1352. func (aws *AWS) getDisksForRegion(ctx context.Context, region string, maxResults int32, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
  1353. aak, err := aws.GetAWSAccessKey()
  1354. if err != nil {
  1355. return nil, err
  1356. }
  1357. cfg, err := aak.CreateConfig(region)
  1358. if err != nil {
  1359. return nil, err
  1360. }
  1361. cli := ec2.NewFromConfig(cfg)
  1362. return cli.DescribeVolumes(ctx, &ec2.DescribeVolumesInput{
  1363. MaxResults: &maxResults,
  1364. NextToken: nextToken,
  1365. })
  1366. }
  1367. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  1368. func (aws *AWS) GetDisks() ([]byte, error) {
  1369. aws.ConfigureAuth() // load authentication data into env vars
  1370. volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
  1371. errorCh := make(chan error, len(awsRegions))
  1372. var wg sync.WaitGroup
  1373. wg.Add(len(awsRegions))
  1374. // Get volumes from each AWS region
  1375. for _, r := range awsRegions {
  1376. // Fetch volume response and send results and errors to their
  1377. // respective channels
  1378. go func(region string) {
  1379. defer wg.Done()
  1380. defer errs.HandlePanic()
  1381. // Query for first page of volume results
  1382. resp, err := aws.getDisksForRegion(context.TODO(), region, 1000, nil)
  1383. if err != nil {
  1384. errorCh <- err
  1385. return
  1386. }
  1387. volumeCh <- resp
  1388. // A NextToken indicates more pages of results. Keep querying
  1389. // until all pages are retrieved.
  1390. for resp.NextToken != nil {
  1391. resp, err = aws.getDisksForRegion(context.TODO(), region, 100, resp.NextToken)
  1392. if err != nil {
  1393. errorCh <- err
  1394. return
  1395. }
  1396. volumeCh <- resp
  1397. }
  1398. }(r)
  1399. }
  1400. // Close the result channels after everything has been sent
  1401. go func() {
  1402. defer errs.HandlePanic()
  1403. wg.Wait()
  1404. close(errorCh)
  1405. close(volumeCh)
  1406. }()
  1407. var volumes []*ec2Types.Volume
  1408. for vols := range volumeCh {
  1409. for _, vol := range vols.Volumes {
  1410. v := vol // duplicate to avoid pointer to iterator
  1411. volumes = append(volumes, &v)
  1412. }
  1413. }
  1414. var errs []error
  1415. for err := range errorCh {
  1416. log.DedupedWarningf(5, "unable to get disks: %s", err)
  1417. errs = append(errs, err)
  1418. }
  1419. // Return error if no volumes are returned
  1420. if len(errs) > 0 && len(volumes) == 0 {
  1421. return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errs), errs)
  1422. }
  1423. // Format the response this way to match the JSON-encoded formatting of a single response
  1424. // from DescribeVolumes, so that consumers can always expect AWS disk responses to have
  1425. // a "Volumes" key at the top level.
  1426. return json.Marshal(map[string][]*ec2Types.Volume{
  1427. "Volumes": volumes,
  1428. })
  1429. }
  1430. func (*AWS) GetOrphanedResources() ([]OrphanedResource, error) {
  1431. return nil, errors.New("not implemented")
  1432. }
  1433. // QueryAthenaPaginated executes athena query and processes results.
  1434. func (aws *AWS) QueryAthenaPaginated(ctx context.Context, query string, fn func(*athena.GetQueryResultsOutput) bool) error {
  1435. awsAthenaInfo, err := aws.GetAWSAthenaInfo()
  1436. if err != nil {
  1437. return err
  1438. }
  1439. if awsAthenaInfo.AthenaDatabase == "" || awsAthenaInfo.AthenaTable == "" || awsAthenaInfo.AthenaRegion == "" ||
  1440. awsAthenaInfo.AthenaBucketName == "" || awsAthenaInfo.AccountID == "" {
  1441. return fmt.Errorf("QueryAthenaPaginated: athena configuration incomplete")
  1442. }
  1443. queryExecutionCtx := &athenaTypes.QueryExecutionContext{
  1444. Database: awsSDK.String(awsAthenaInfo.AthenaDatabase),
  1445. }
  1446. resultConfiguration := &athenaTypes.ResultConfiguration{
  1447. OutputLocation: awsSDK.String(awsAthenaInfo.AthenaBucketName),
  1448. }
  1449. startQueryExecutionInput := &athena.StartQueryExecutionInput{
  1450. QueryString: awsSDK.String(query),
  1451. QueryExecutionContext: queryExecutionCtx,
  1452. ResultConfiguration: resultConfiguration,
  1453. }
  1454. // Only set if there is a value, the default input is nil which defaults to the 'primary' workgroup
  1455. if awsAthenaInfo.AthenaWorkgroup != "" {
  1456. startQueryExecutionInput.WorkGroup = awsSDK.String(awsAthenaInfo.AthenaWorkgroup)
  1457. }
  1458. // Create Athena Client
  1459. cfg, err := awsAthenaInfo.CreateConfig()
  1460. if err != nil {
  1461. log.Errorf("Could not retrieve Athena Configuration: %s", err.Error())
  1462. }
  1463. cli := athena.NewFromConfig(cfg)
  1464. // Query Athena
  1465. startQueryExecutionOutput, err := cli.StartQueryExecution(ctx, startQueryExecutionInput)
  1466. if err != nil {
  1467. return fmt.Errorf("QueryAthenaPaginated: start query error: %s", err.Error())
  1468. }
  1469. err = waitForQueryToComplete(ctx, cli, startQueryExecutionOutput.QueryExecutionId)
  1470. if err != nil {
  1471. return fmt.Errorf("QueryAthenaPaginated: query execution error: %s", err.Error())
  1472. }
  1473. queryResultsInput := &athena.GetQueryResultsInput{
  1474. QueryExecutionId: startQueryExecutionOutput.QueryExecutionId,
  1475. }
  1476. getQueryResultsPaginator := athena.NewGetQueryResultsPaginator(cli, queryResultsInput)
  1477. for getQueryResultsPaginator.HasMorePages() {
  1478. pg, err := getQueryResultsPaginator.NextPage(ctx)
  1479. if err != nil {
  1480. log.Errorf("QueryAthenaPaginated: NextPage error: %s", err.Error())
  1481. continue
  1482. }
  1483. fn(pg)
  1484. }
  1485. return nil
  1486. }
  1487. func waitForQueryToComplete(ctx context.Context, client *athena.Client, queryExecutionID *string) error {
  1488. inp := &athena.GetQueryExecutionInput{
  1489. QueryExecutionId: queryExecutionID,
  1490. }
  1491. isQueryStillRunning := true
  1492. for isQueryStillRunning {
  1493. qe, err := client.GetQueryExecution(ctx, inp)
  1494. if err != nil {
  1495. return err
  1496. }
  1497. if qe.QueryExecution.Status.State == "SUCCEEDED" {
  1498. isQueryStillRunning = false
  1499. continue
  1500. }
  1501. if qe.QueryExecution.Status.State != "RUNNING" && qe.QueryExecution.Status.State != "QUEUED" {
  1502. return fmt.Errorf("no query results available for query %s", *queryExecutionID)
  1503. }
  1504. time.Sleep(2 * time.Second)
  1505. }
  1506. return nil
  1507. }
  1508. type SavingsPlanData struct {
  1509. ResourceID string
  1510. EffectiveCost float64
  1511. SavingsPlanARN string
  1512. MostRecentDate string
  1513. }
  1514. func (aws *AWS) GetSavingsPlanDataFromAthena() error {
  1515. cfg, err := aws.GetConfig()
  1516. if err != nil {
  1517. aws.RIPricingError = err
  1518. return err
  1519. }
  1520. if cfg.AthenaBucketName == "" {
  1521. err = fmt.Errorf("No Athena Bucket configured")
  1522. aws.RIPricingError = err
  1523. return err
  1524. }
  1525. if aws.SavingsPlanDataByInstanceID == nil {
  1526. aws.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData)
  1527. }
  1528. tNow := time.Now()
  1529. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1530. start := tOneDayAgo.Format("2006-01-02")
  1531. end := tNow.Format("2006-01-02")
  1532. // Use Savings Plan Effective Rate as an estimation for cost, assuming the 1h most recent period got a fully loaded savings plan.
  1533. //
  1534. q := `SELECT
  1535. line_item_usage_start_date,
  1536. savings_plan_savings_plan_a_r_n,
  1537. line_item_resource_id,
  1538. savings_plan_savings_plan_rate
  1539. FROM %s as cost_data
  1540. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1541. AND line_item_line_item_type = 'SavingsPlanCoveredUsage' ORDER BY
  1542. line_item_usage_start_date DESC`
  1543. page := 0
  1544. processResults := func(op *athena.GetQueryResultsOutput) bool {
  1545. if op == nil {
  1546. log.Errorf("GetSavingsPlanDataFromAthena: Athena page is nil")
  1547. return false
  1548. } else if op.ResultSet == nil {
  1549. log.Errorf("GetSavingsPlanDataFromAthena: Athena page.ResultSet is nil")
  1550. return false
  1551. }
  1552. aws.SavingsPlanDataLock.Lock()
  1553. aws.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData) // Clean out the old data and only report a savingsplan price if its in the most recent run.
  1554. mostRecentDate := ""
  1555. iter := op.ResultSet.Rows
  1556. if page == 0 && len(iter) > 0 {
  1557. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1558. }
  1559. page++
  1560. for _, r := range iter {
  1561. d := *r.Data[0].VarCharValue
  1562. if mostRecentDate == "" {
  1563. mostRecentDate = d
  1564. } else if mostRecentDate != d { // Get all most recent assignments
  1565. break
  1566. }
  1567. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1568. if err != nil {
  1569. log.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1570. }
  1571. r := &SavingsPlanData{
  1572. ResourceID: *r.Data[2].VarCharValue,
  1573. EffectiveCost: cost,
  1574. SavingsPlanARN: *r.Data[1].VarCharValue,
  1575. MostRecentDate: d,
  1576. }
  1577. aws.SavingsPlanDataByInstanceID[r.ResourceID] = r
  1578. }
  1579. log.Debugf("Found %d savings plan applied instances", len(aws.SavingsPlanDataByInstanceID))
  1580. for k, r := range aws.SavingsPlanDataByInstanceID {
  1581. log.DedupedInfof(5, "Savings Plan Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1582. }
  1583. aws.SavingsPlanDataLock.Unlock()
  1584. return true
  1585. }
  1586. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1587. log.Debugf("Running Query: %s", query)
  1588. err = aws.QueryAthenaPaginated(context.TODO(), query, processResults)
  1589. if err != nil {
  1590. aws.RIPricingError = err
  1591. return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
  1592. }
  1593. return nil
  1594. }
  1595. type RIData struct {
  1596. ResourceID string
  1597. EffectiveCost float64
  1598. ReservationARN string
  1599. MostRecentDate string
  1600. }
  1601. func (aws *AWS) GetReservationDataFromAthena() error {
  1602. cfg, err := aws.GetConfig()
  1603. if err != nil {
  1604. aws.RIPricingError = err
  1605. return err
  1606. }
  1607. if cfg.AthenaBucketName == "" {
  1608. err = fmt.Errorf("No Athena Bucket configured")
  1609. aws.RIPricingError = err
  1610. return err
  1611. }
  1612. // Query for all column names in advance in order to validate configured
  1613. // label columns
  1614. columns, _ := aws.fetchColumns()
  1615. if !columns["reservation_reservation_a_r_n"] || !columns["reservation_effective_cost"] {
  1616. err = fmt.Errorf("no reservation data available in Athena")
  1617. aws.RIPricingError = err
  1618. return err
  1619. }
  1620. if aws.RIPricingByInstanceID == nil {
  1621. aws.RIPricingByInstanceID = make(map[string]*RIData)
  1622. }
  1623. tNow := time.Now()
  1624. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1625. start := tOneDayAgo.Format("2006-01-02")
  1626. end := tNow.Format("2006-01-02")
  1627. q := `SELECT
  1628. line_item_usage_start_date,
  1629. reservation_reservation_a_r_n,
  1630. line_item_resource_id,
  1631. reservation_effective_cost
  1632. FROM %s as cost_data
  1633. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1634. AND reservation_reservation_a_r_n <> '' ORDER BY
  1635. line_item_usage_start_date DESC`
  1636. page := 0
  1637. processResults := func(op *athena.GetQueryResultsOutput) bool {
  1638. if op == nil {
  1639. log.Errorf("GetReservationDataFromAthena: Athena page is nil")
  1640. return false
  1641. } else if op.ResultSet == nil {
  1642. log.Errorf("GetReservationDataFromAthena: Athena page.ResultSet is nil")
  1643. return false
  1644. }
  1645. aws.RIDataLock.Lock()
  1646. aws.RIPricingByInstanceID = make(map[string]*RIData) // Clean out the old data and only report a RI price if its in the most recent run.
  1647. mostRecentDate := ""
  1648. iter := op.ResultSet.Rows
  1649. if page == 0 && len(iter) > 0 {
  1650. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1651. }
  1652. page++
  1653. for _, r := range iter {
  1654. d := *r.Data[0].VarCharValue
  1655. if mostRecentDate == "" {
  1656. mostRecentDate = d
  1657. } else if mostRecentDate != d { // Get all most recent assignments
  1658. break
  1659. }
  1660. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1661. if err != nil {
  1662. log.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1663. }
  1664. r := &RIData{
  1665. ResourceID: *r.Data[2].VarCharValue,
  1666. EffectiveCost: cost,
  1667. ReservationARN: *r.Data[1].VarCharValue,
  1668. MostRecentDate: d,
  1669. }
  1670. aws.RIPricingByInstanceID[r.ResourceID] = r
  1671. }
  1672. log.Debugf("Found %d reserved instances", len(aws.RIPricingByInstanceID))
  1673. for k, r := range aws.RIPricingByInstanceID {
  1674. log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1675. }
  1676. aws.RIDataLock.Unlock()
  1677. return true
  1678. }
  1679. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1680. log.Debugf("Running Query: %s", query)
  1681. err = aws.QueryAthenaPaginated(context.TODO(), query, processResults)
  1682. if err != nil {
  1683. aws.RIPricingError = err
  1684. return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
  1685. }
  1686. aws.RIPricingError = nil
  1687. return nil
  1688. }
  1689. // fetchColumns returns a list of the names of all columns in the configured
  1690. // Athena tables
  1691. func (aws *AWS) fetchColumns() (map[string]bool, error) {
  1692. columnSet := map[string]bool{}
  1693. awsAthenaInfo, err := aws.GetAWSAthenaInfo()
  1694. if err != nil {
  1695. return nil, err
  1696. }
  1697. // This Query is supported by Athena tables and views
  1698. q := `SELECT column_name FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s'`
  1699. query := fmt.Sprintf(q, awsAthenaInfo.AthenaDatabase, awsAthenaInfo.AthenaTable)
  1700. pageNum := 0
  1701. athenaErr := aws.QueryAthenaPaginated(context.TODO(), query, func(page *athena.GetQueryResultsOutput) bool {
  1702. if page == nil {
  1703. log.Errorf("fetchColumns: Athena page is nil")
  1704. return false
  1705. } else if page.ResultSet == nil {
  1706. log.Errorf("fetchColumns: Athena page.ResultSet is nil")
  1707. return false
  1708. }
  1709. // remove header row 'column_name'
  1710. rows := page.ResultSet.Rows[1:]
  1711. for _, row := range rows {
  1712. columnSet[*row.Data[0].VarCharValue] = true
  1713. }
  1714. pageNum++
  1715. return true
  1716. })
  1717. if athenaErr != nil {
  1718. return columnSet, athenaErr
  1719. }
  1720. if len(columnSet) == 0 {
  1721. log.Infof("No columns retrieved from Athena")
  1722. }
  1723. return columnSet, nil
  1724. }
  1725. type spotInfo struct {
  1726. Timestamp string `csv:"Timestamp"`
  1727. UsageType string `csv:"UsageType"`
  1728. Operation string `csv:"Operation"`
  1729. InstanceID string `csv:"InstanceID"`
  1730. MyBidID string `csv:"MyBidID"`
  1731. MyMaxPrice string `csv:"MyMaxPrice"`
  1732. MarketPrice string `csv:"MarketPrice"`
  1733. Charge string `csv:"Charge"`
  1734. Version string `csv:"Version"`
  1735. }
  1736. func (aws *AWS) parseSpotData(bucket string, prefix string, projectID string, region string) (map[string]*spotInfo, error) {
  1737. aws.ConfigureAuth() // configure aws api authentication by setting env vars
  1738. s3Prefix := projectID
  1739. if len(prefix) != 0 {
  1740. s3Prefix = prefix + "/" + s3Prefix
  1741. }
  1742. aak, err := aws.GetAWSAccessKey()
  1743. if err != nil {
  1744. return nil, err
  1745. }
  1746. cfg, err := aak.CreateConfig(region)
  1747. if err != nil {
  1748. return nil, err
  1749. }
  1750. cli := s3.NewFromConfig(cfg)
  1751. downloader := manager.NewDownloader(cli)
  1752. tNow := time.Now()
  1753. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1754. ls := &s3.ListObjectsInput{
  1755. Bucket: awsSDK.String(bucket),
  1756. Prefix: awsSDK.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1757. }
  1758. ls2 := &s3.ListObjectsInput{
  1759. Bucket: awsSDK.String(bucket),
  1760. Prefix: awsSDK.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1761. }
  1762. lso, err := cli.ListObjects(context.TODO(), ls)
  1763. if err != nil {
  1764. aws.serviceAccountChecks.set("bucketList", &ServiceAccountCheck{
  1765. Message: "Bucket List Permissions Available",
  1766. Status: false,
  1767. AdditionalInfo: err.Error(),
  1768. })
  1769. return nil, err
  1770. } else {
  1771. aws.serviceAccountChecks.set("bucketList", &ServiceAccountCheck{
  1772. Message: "Bucket List Permissions Available",
  1773. Status: true,
  1774. })
  1775. }
  1776. lsoLen := len(lso.Contents)
  1777. log.Debugf("Found %d spot data files from yesterday", lsoLen)
  1778. if lsoLen == 0 {
  1779. log.Debugf("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1780. }
  1781. lso2, err := cli.ListObjects(context.TODO(), ls2)
  1782. if err != nil {
  1783. return nil, err
  1784. }
  1785. lso2Len := len(lso2.Contents)
  1786. log.Debugf("Found %d spot data files from today", lso2Len)
  1787. if lso2Len == 0 {
  1788. log.Debugf("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1789. }
  1790. // TODO: Worth it to use LastModifiedDate to determine if we should reparse the spot data?
  1791. var keys []*string
  1792. for _, obj := range lso.Contents {
  1793. keys = append(keys, obj.Key)
  1794. }
  1795. for _, obj := range lso2.Contents {
  1796. keys = append(keys, obj.Key)
  1797. }
  1798. header, err := csvutil.Header(spotInfo{}, "csv")
  1799. if err != nil {
  1800. return nil, err
  1801. }
  1802. fieldsPerRecord := len(header)
  1803. spots := make(map[string]*spotInfo)
  1804. for _, key := range keys {
  1805. getObj := &s3.GetObjectInput{
  1806. Bucket: awsSDK.String(bucket),
  1807. Key: key,
  1808. }
  1809. buf := manager.NewWriteAtBuffer([]byte{})
  1810. _, err := downloader.Download(context.TODO(), buf, getObj)
  1811. if err != nil {
  1812. aws.serviceAccountChecks.set("objectList", &ServiceAccountCheck{
  1813. Message: "Object Get Permissions Available",
  1814. Status: false,
  1815. AdditionalInfo: err.Error(),
  1816. })
  1817. return nil, err
  1818. } else {
  1819. aws.serviceAccountChecks.set("objectList", &ServiceAccountCheck{
  1820. Message: "Object Get Permissions Available",
  1821. Status: true,
  1822. })
  1823. }
  1824. r := bytes.NewReader(buf.Bytes())
  1825. gr, err := gzip.NewReader(r)
  1826. if err != nil {
  1827. return nil, err
  1828. }
  1829. csvReader := csv.NewReader(gr)
  1830. csvReader.Comma = '\t'
  1831. csvReader.FieldsPerRecord = fieldsPerRecord
  1832. dec, err := csvutil.NewDecoder(csvReader, header...)
  1833. if err != nil {
  1834. return nil, err
  1835. }
  1836. var foundVersion string
  1837. for {
  1838. spot := spotInfo{}
  1839. err := dec.Decode(&spot)
  1840. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1841. if err == io.EOF {
  1842. break
  1843. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1844. rec := dec.Record()
  1845. // the first two "Record()" will be the comment lines
  1846. // and they show up as len() == 1
  1847. // the first of which is "#Version"
  1848. // the second of which is "#Fields: "
  1849. if len(rec) != 1 {
  1850. log.Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1851. continue
  1852. }
  1853. if len(foundVersion) == 0 {
  1854. spotFeedVersion := rec[0]
  1855. log.Debugf("Spot feed version is \"%s\"", spotFeedVersion)
  1856. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1857. if matches != nil {
  1858. foundVersion = matches[1]
  1859. if foundVersion != supportedSpotFeedVersion {
  1860. log.Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1861. break
  1862. }
  1863. }
  1864. continue
  1865. } else if strings.Index(rec[0], "#") == 0 {
  1866. continue
  1867. } else {
  1868. log.Infof("skipping non-TSV line: %s", rec)
  1869. continue
  1870. }
  1871. } else if err != nil {
  1872. log.Warnf("Error during spot info decode: %+v", err)
  1873. continue
  1874. }
  1875. log.DedupedInfof(5, "Found spot info for: %s", spot.InstanceID)
  1876. spots[spot.InstanceID] = &spot
  1877. }
  1878. gr.Close()
  1879. }
  1880. return spots, nil
  1881. }
  1882. // ApplyReservedInstancePricing TODO
  1883. func (aws *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  1884. }
  1885. func (aws *AWS) ServiceAccountStatus() *ServiceAccountStatus {
  1886. return aws.serviceAccountChecks.getStatus()
  1887. }
  1888. func (aws *AWS) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  1889. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  1890. }
  1891. // Regions returns a predefined list of AWS regions
  1892. func (aws *AWS) Regions() []string {
  1893. return awsRegions
  1894. }