provider.go 81 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632
  1. package aws
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "encoding/csv"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "net/http"
  11. "os"
  12. "regexp"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "github.com/aws/smithy-go"
  18. "github.com/opencost/opencost/pkg/cloud/models"
  19. "github.com/opencost/opencost/pkg/cloud/utils"
  20. "github.com/opencost/opencost/core/pkg/clustercache"
  21. coreenv "github.com/opencost/opencost/core/pkg/env"
  22. errs "github.com/opencost/opencost/core/pkg/errors"
  23. "github.com/opencost/opencost/core/pkg/log"
  24. "github.com/opencost/opencost/core/pkg/opencost"
  25. "github.com/opencost/opencost/core/pkg/util"
  26. "github.com/opencost/opencost/core/pkg/util/fileutil"
  27. "github.com/opencost/opencost/core/pkg/util/json"
  28. "github.com/opencost/opencost/core/pkg/util/timeutil"
  29. "github.com/opencost/opencost/pkg/env"
  30. awsSDK "github.com/aws/aws-sdk-go-v2/aws"
  31. "github.com/aws/aws-sdk-go-v2/config"
  32. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  33. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  34. "github.com/aws/aws-sdk-go-v2/service/athena"
  35. athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types"
  36. "github.com/aws/aws-sdk-go-v2/service/ec2"
  37. ec2Types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
  38. "github.com/aws/aws-sdk-go-v2/service/s3"
  39. "github.com/aws/aws-sdk-go-v2/service/sts"
  40. "github.com/jszwec/csvutil"
  41. )
  42. const (
  43. supportedSpotFeedVersion = "1"
  44. SpotInfoUpdateType = "spotinfo"
  45. AthenaInfoUpdateType = "athenainfo"
  46. PreemptibleType = "preemptible"
  47. APIPricingSource = "Public API"
  48. SpotPricingSource = "Spot Data Feed"
  49. SpotPriceHistorySource = "Spot Price History"
  50. ReservedInstancePricingSource = "Savings Plan, Reserved Instance, and Out-Of-Cluster"
  51. FargatePricingSource = "Fargate"
  52. InUseState = "in-use"
  53. AttachedState = "attached"
  54. AWSHourlyPublicIPCost = 0.005
  55. EKSCapacityTypeLabel = "eks.amazonaws.com/capacityType"
  56. EKSCapacitySpotTypeValue = "SPOT"
  57. // relevant to pricing url
  58. awsPricingBaseURL = "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/"
  59. awsChinaPricingBaseURL = "https://pricing.cn-north-1.amazonaws.com.cn/offers/v1.0/cn/"
  60. pricingCurrentPath = "/current/"
  61. pricingIndexFile = "index.json"
  62. chinaRegionPrefix = "cn-"
  63. )
  64. var (
  65. // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  66. provIdRx = regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  67. usageTypeRegx = regexp.MustCompile(".*(-|^)(EBS.+)")
  68. versionRx = regexp.MustCompile(`^#Version: (\\d+)\\.\\d+$`)
  69. regionRx = regexp.MustCompile("([a-z]+-[a-z]+-[0-9])")
  70. ErrNoAthenaBucket = errors.New("No Athena Bucket configured")
  71. // StorageClassProvisionerDefaults specifies the default storage class types depending upon the provisioner
  72. StorageClassProvisionerDefaults = map[string]string{
  73. "kubernetes.io/aws-ebs": "gp2",
  74. "ebs.csi.aws.com": "gp3",
  75. // TODO: add efs provisioner
  76. }
  77. )
  78. func (aws *AWS) PricingSourceStatus() map[string]*models.PricingSource {
  79. sources := make(map[string]*models.PricingSource)
  80. sps := &models.PricingSource{
  81. Name: SpotPricingSource,
  82. Enabled: true,
  83. }
  84. if aws.SpotFeedRefreshEnabled() {
  85. sps.Error = ""
  86. if aws.SpotPricingError != nil {
  87. sps.Error = aws.SpotPricingError.Error()
  88. }
  89. if sps.Error != "" {
  90. sps.Available = false
  91. } else if len(aws.SpotPricingByInstanceID) > 0 {
  92. sps.Available = true
  93. } else {
  94. sps.Error = "No spot instances detected"
  95. }
  96. } else {
  97. sps.Available = false
  98. sps.Error = "Spot instances not set up"
  99. sps.Enabled = false
  100. }
  101. sources[SpotPricingSource] = sps
  102. sphs := &models.PricingSource{
  103. Name: SpotPriceHistorySource,
  104. Enabled: true,
  105. }
  106. if aws.SpotPriceHistoryError != nil {
  107. sphs.Error = aws.SpotPriceHistoryError.Error()
  108. sphs.Available = false
  109. } else if aws.SpotPriceHistoryCache == nil {
  110. sphs.Error = "Not yet initialized"
  111. sphs.Available = false
  112. } else {
  113. sphs.Available = true
  114. }
  115. sources[SpotPriceHistorySource] = sphs
  116. rps := &models.PricingSource{
  117. Name: ReservedInstancePricingSource,
  118. Enabled: true,
  119. }
  120. rps.Error = ""
  121. if aws.RIPricingError != nil {
  122. rps.Error = aws.RIPricingError.Error()
  123. }
  124. if rps.Error != "" {
  125. rps.Available = false
  126. } else {
  127. rps.Available = true
  128. }
  129. sources[ReservedInstancePricingSource] = rps
  130. fs := &models.PricingSource{
  131. Name: FargatePricingSource,
  132. Enabled: true,
  133. Available: true,
  134. }
  135. if aws.FargatePricingError != nil {
  136. fs.Error = aws.FargatePricingError.Error()
  137. fs.Available = false
  138. }
  139. sources[FargatePricingSource] = fs
  140. return sources
  141. }
  142. // SpotRefreshDuration represents how much time must pass before we refresh
  143. const SpotRefreshDuration = 15 * time.Minute
  144. var awsRegions = []string{
  145. "us-east-2",
  146. "us-east-1",
  147. "us-west-1",
  148. "us-west-2",
  149. "ap-east-1",
  150. "ap-south-1",
  151. "ap-northeast-3",
  152. "ap-northeast-2",
  153. "ap-southeast-1",
  154. "ap-southeast-2",
  155. "ap-northeast-1",
  156. "ap-southeast-3",
  157. "ca-central-1",
  158. "cn-north-1",
  159. "cn-northwest-1",
  160. "eu-central-1",
  161. "eu-west-1",
  162. "eu-west-2",
  163. "eu-west-3",
  164. "eu-north-1",
  165. "eu-south-1",
  166. "me-south-1",
  167. "sa-east-1",
  168. "af-south-1",
  169. "us-gov-east-1",
  170. "us-gov-west-1",
  171. "me-central-1",
  172. }
  173. // AWS represents an Amazon Provider
  174. type AWS struct {
  175. Pricing map[string]*AWSProductTerms
  176. SpotPricingByInstanceID map[string]*spotInfo
  177. SpotPricingUpdatedAt *time.Time
  178. SpotRefreshRunning bool
  179. SpotPricingLock sync.RWMutex
  180. SpotPricingError error
  181. SpotPriceHistoryCache *SpotPriceHistoryCache
  182. SpotPriceHistoryError error
  183. RIPricingByInstanceID map[string]*RIData
  184. RIPricingError error
  185. RIDataRunning bool
  186. RIDataLock sync.RWMutex
  187. SavingsPlanDataByInstanceID map[string]*SavingsPlanData
  188. SavingsPlanDataRunning bool
  189. SavingsPlanDataLock sync.RWMutex
  190. FargatePricing *FargatePricing
  191. FargatePricingError error
  192. ValidPricingKeys map[string]bool
  193. Clientset clustercache.ClusterCache
  194. BaseCPUPrice string
  195. BaseRAMPrice string
  196. BaseGPUPrice string
  197. BaseSpotCPUPrice string
  198. BaseSpotRAMPrice string
  199. BaseSpotGPUPrice string
  200. SpotLabelName string
  201. SpotLabelValue string
  202. SpotDataRegion string
  203. SpotDataBucket string
  204. SpotDataPrefix string
  205. ProjectID string
  206. DownloadPricingDataLock sync.RWMutex
  207. Config models.ProviderConfig
  208. ServiceAccountChecks *models.ServiceAccountChecks
  209. clusterManagementPrice float64
  210. ClusterRegion string
  211. ClusterAccountID string
  212. clusterProvisioner string
  213. }
  214. // AWSAccessKey holds AWS credentials and fulfils the awsV2.CredentialsProvider interface
  215. // Deprecated: v1.104 Use AccessKey instead
  216. type AWSAccessKey struct {
  217. AccessKeyID string `json:"aws_access_key_id"`
  218. SecretAccessKey string `json:"aws_secret_access_key"`
  219. }
  220. // Retrieve returns a set of awsV2 credentials using the AWSAccessKey's key and secret.
  221. // This fulfils the awsV2.CredentialsProvider interface contract.
  222. func (accessKey AWSAccessKey) Retrieve(ctx context.Context) (awsSDK.Credentials, error) {
  223. return awsSDK.Credentials{
  224. AccessKeyID: accessKey.AccessKeyID,
  225. SecretAccessKey: accessKey.SecretAccessKey,
  226. }, nil
  227. }
  228. // CreateConfig creates an AWS SDK V2 Config for the credentials that it contains for the provided region
  229. func (accessKey AWSAccessKey) CreateConfig(region string) (awsSDK.Config, error) {
  230. var cfg awsSDK.Config
  231. var err error
  232. // If accessKey values have not been provided, attempt to load cfg from service key annotations
  233. if accessKey.AccessKeyID == "" && accessKey.SecretAccessKey == "" {
  234. cfg, err = config.LoadDefaultConfig(context.TODO(), config.WithRegion(region))
  235. if err != nil {
  236. return cfg, fmt.Errorf("failed to initialize AWS SDK config for region from annotation %s: %s", region, err)
  237. }
  238. } else {
  239. // The AWS SDK v2 requires an object fulfilling the CredentialsProvider interface, which cloud.AWSAccessKey does
  240. cfg, err = config.LoadDefaultConfig(context.TODO(), config.WithCredentialsProvider(accessKey), config.WithRegion(region))
  241. if err != nil {
  242. return cfg, fmt.Errorf("failed to initialize AWS SDK config for region %s: %s", region, err)
  243. }
  244. }
  245. return cfg, nil
  246. }
  247. // AWSProductTerms represents the full terms of the product
  248. type AWSProductTerms struct {
  249. Sku string `json:"sku"`
  250. OnDemand *PriceListEC2Term `json:"OnDemand"`
  251. Reserved *PriceListEC2Term `json:"Reserved"`
  252. Memory string `json:"memory"`
  253. Storage string `json:"storage"`
  254. VCpu string `json:"vcpu"`
  255. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  256. PV *models.PV `json:"pv"`
  257. LoadBalancer *models.LoadBalancer `json:"load_balancer"`
  258. }
  259. // volTypes are used to map between AWS UsageTypes and
  260. // EBS volume types, as they would appear in K8s storage class
  261. // name and the EC2 API.
  262. var volTypes = map[string]string{
  263. "EBS:VolumeUsage.gp2": "gp2",
  264. "EBS:VolumeUsage.gp3": "gp3",
  265. "EBS:VolumeUsage": "standard",
  266. "EBS:VolumeUsage.sc1": "sc1",
  267. "EBS:VolumeP-IOPS.piops": "io1",
  268. "EBS:VolumeUsage.st1": "st1",
  269. "EBS:VolumeUsage.piops": "io1",
  270. "EBS:VolumeUsage.io2": "io2",
  271. "gp2": "EBS:VolumeUsage.gp2",
  272. "gp3": "EBS:VolumeUsage.gp3",
  273. "standard": "EBS:VolumeUsage",
  274. "sc1": "EBS:VolumeUsage.sc1",
  275. "io1": "EBS:VolumeUsage.piops",
  276. "st1": "EBS:VolumeUsage.st1",
  277. "io2": "EBS:VolumeUsage.io2",
  278. }
  279. var (
  280. loadedAWSSecret bool = false
  281. awsSecret *AWSAccessKey = nil
  282. )
  283. // KubeAttrConversion maps the k8s labels for region to an AWS key
  284. func (aws *AWS) KubeAttrConversion(region, instanceType, operatingSystem string) string {
  285. operatingSystem = strings.ToLower(operatingSystem)
  286. return region + "," + instanceType + "," + operatingSystem
  287. }
  288. // AwsSpotFeedInfo contains configuration for spot feed integration
  289. type AwsSpotFeedInfo struct {
  290. BucketName string `json:"bucketName"`
  291. Prefix string `json:"prefix"`
  292. Region string `json:"region"`
  293. AccountID string `json:"projectID"`
  294. ServiceKeyName string `json:"serviceKeyName"`
  295. ServiceKeySecret string `json:"serviceKeySecret"`
  296. SpotLabel string `json:"spotLabel"`
  297. SpotLabelValue string `json:"spotLabelValue"`
  298. }
  299. // AwsAthenaInfo contains configuration for CUR integration
  300. // Deprecated: v1.104 Use AthenaConfiguration instead
  301. type AwsAthenaInfo struct {
  302. AthenaBucketName string `json:"athenaBucketName"`
  303. AthenaRegion string `json:"athenaRegion"`
  304. AthenaDatabase string `json:"athenaDatabase"`
  305. AthenaCatalog string `json:"athenaCatalog"`
  306. AthenaTable string `json:"athenaTable"`
  307. AthenaWorkgroup string `json:"athenaWorkgroup"`
  308. ServiceKeyName string `json:"serviceKeyName"`
  309. ServiceKeySecret string `json:"serviceKeySecret"`
  310. AccountID string `json:"projectID"`
  311. MasterPayerARN string `json:"masterPayerARN"`
  312. }
  313. // IsEmpty returns true if all fields in config are empty, false if not.
  314. func (aai *AwsAthenaInfo) IsEmpty() bool {
  315. return aai.AthenaBucketName == "" &&
  316. aai.AthenaRegion == "" &&
  317. aai.AthenaDatabase == "" &&
  318. aai.AthenaCatalog == "" &&
  319. aai.AthenaTable == "" &&
  320. aai.AthenaWorkgroup == "" &&
  321. aai.ServiceKeyName == "" &&
  322. aai.ServiceKeySecret == "" &&
  323. aai.AccountID == "" &&
  324. aai.MasterPayerARN == ""
  325. }
  326. // CreateConfig creates an AWS SDK V2 Config for the credentials that it contains
  327. func (aai *AwsAthenaInfo) CreateConfig() (awsSDK.Config, error) {
  328. keyProvider := AWSAccessKey{AccessKeyID: aai.ServiceKeyName, SecretAccessKey: aai.ServiceKeySecret}
  329. cfg, err := keyProvider.CreateConfig(aai.AthenaRegion)
  330. if err != nil {
  331. return cfg, err
  332. }
  333. if aai.MasterPayerARN != "" {
  334. // Create the credentials from AssumeRoleProvider to assume the role
  335. // referenced by the roleARN.
  336. stsSvc := sts.NewFromConfig(cfg)
  337. creds := stscreds.NewAssumeRoleProvider(stsSvc, aai.MasterPayerARN)
  338. cfg.Credentials = awsSDK.NewCredentialsCache(creds)
  339. }
  340. return cfg, nil
  341. }
  342. func (aws *AWS) GetManagementPlatform() (string, error) {
  343. nodes := aws.Clientset.GetAllNodes()
  344. if len(nodes) > 0 {
  345. n := nodes[0]
  346. version := n.Status.NodeInfo.KubeletVersion
  347. if strings.Contains(version, "eks") {
  348. return "eks", nil
  349. }
  350. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  351. return "kops", nil
  352. }
  353. }
  354. return "", nil
  355. }
  356. func (aws *AWS) GetConfig() (*models.CustomPricing, error) {
  357. c, err := aws.Config.GetCustomPricingData()
  358. if err != nil {
  359. return nil, err
  360. }
  361. if c.Discount == "" {
  362. c.Discount = "0%"
  363. }
  364. if c.NegotiatedDiscount == "" {
  365. c.NegotiatedDiscount = "0%"
  366. }
  367. return c, nil
  368. }
  369. // GetAWSAccessKey generate an AWSAccessKey object from the config
  370. func (aws *AWS) GetAWSAccessKey() (*AWSAccessKey, error) {
  371. config, err := aws.GetConfig()
  372. if err != nil {
  373. return nil, fmt.Errorf("could not retrieve AwsAthenaInfo %s", err)
  374. }
  375. err = aws.ConfigureAuthWith(config)
  376. if err != nil {
  377. return nil, fmt.Errorf("error configuring Cloud Provider %s", err)
  378. }
  379. // Look for service key values in env if not present in config
  380. if config.AwsServiceKeyName == "" {
  381. config.AwsServiceKeyName = env.GetAWSAccessKeyID()
  382. }
  383. if config.AwsServiceKeySecret == "" {
  384. config.AwsServiceKeySecret = env.GetAWSAccessKeySecret()
  385. }
  386. if config.AwsServiceKeyName == "" && config.AwsServiceKeySecret == "" {
  387. log.DedupedInfof(1, "missing service key values for AWS cloud integration attempting to use service account integration")
  388. }
  389. return &AWSAccessKey{AccessKeyID: config.AwsServiceKeyName, SecretAccessKey: config.AwsServiceKeySecret}, nil
  390. }
  391. // GetAWSAthenaInfo generate an AWSAthenaInfo object from the config
  392. func (aws *AWS) GetAWSAthenaInfo() (*AwsAthenaInfo, error) {
  393. config, err := aws.GetConfig()
  394. if err != nil {
  395. return nil, fmt.Errorf("could not retrieve AwsAthenaInfo %s", err)
  396. }
  397. aak, err := aws.GetAWSAccessKey()
  398. if err != nil {
  399. return nil, err
  400. }
  401. return &AwsAthenaInfo{
  402. AthenaBucketName: config.AthenaBucketName,
  403. AthenaRegion: config.AthenaRegion,
  404. AthenaDatabase: config.AthenaDatabase,
  405. AthenaCatalog: config.AthenaCatalog,
  406. AthenaTable: config.AthenaTable,
  407. AthenaWorkgroup: config.AthenaWorkgroup,
  408. ServiceKeyName: aak.AccessKeyID,
  409. ServiceKeySecret: aak.SecretAccessKey,
  410. AccountID: config.AthenaProjectID,
  411. MasterPayerARN: config.MasterPayerARN,
  412. }, nil
  413. }
  414. func (aws *AWS) UpdateConfigFromConfigMap(cm map[string]string) (*models.CustomPricing, error) {
  415. return aws.Config.UpdateFromMap(cm)
  416. }
  417. func configUpdaterWithReaderAndType(r io.Reader, updateType string) func(c *models.CustomPricing) error {
  418. return func(c *models.CustomPricing) error {
  419. switch updateType {
  420. case SpotInfoUpdateType:
  421. asfi := AwsSpotFeedInfo{}
  422. err := json.NewDecoder(r).Decode(&asfi)
  423. if err != nil {
  424. return err
  425. }
  426. c.AwsServiceKeyName = asfi.ServiceKeyName
  427. if asfi.ServiceKeySecret != "" {
  428. c.AwsServiceKeySecret = asfi.ServiceKeySecret
  429. }
  430. c.AwsSpotDataPrefix = asfi.Prefix
  431. c.AwsSpotDataBucket = asfi.BucketName
  432. c.ProjectID = asfi.AccountID
  433. c.AwsSpotDataRegion = asfi.Region
  434. c.SpotLabel = asfi.SpotLabel
  435. c.SpotLabelValue = asfi.SpotLabelValue
  436. case AthenaInfoUpdateType:
  437. aai := AwsAthenaInfo{}
  438. err := json.NewDecoder(r).Decode(&aai)
  439. if err != nil {
  440. return err
  441. }
  442. c.AthenaBucketName = aai.AthenaBucketName
  443. c.AthenaRegion = aai.AthenaRegion
  444. c.AthenaDatabase = aai.AthenaDatabase
  445. c.AthenaCatalog = aai.AthenaCatalog
  446. c.AthenaTable = aai.AthenaTable
  447. c.AthenaWorkgroup = aai.AthenaWorkgroup
  448. c.AwsServiceKeyName = aai.ServiceKeyName
  449. if aai.ServiceKeySecret != "" {
  450. c.AwsServiceKeySecret = aai.ServiceKeySecret
  451. }
  452. if aai.MasterPayerARN != "" {
  453. c.MasterPayerARN = aai.MasterPayerARN
  454. }
  455. c.AthenaProjectID = aai.AccountID
  456. default:
  457. a := make(map[string]any)
  458. err := json.NewDecoder(r).Decode(&a)
  459. if err != nil {
  460. return err
  461. }
  462. for k, v := range a {
  463. kUpper := utils.ToTitle.String(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  464. vstr, ok := v.(string)
  465. if ok {
  466. err := models.SetCustomPricingField(c, kUpper, vstr)
  467. if err != nil {
  468. return fmt.Errorf("error setting custom pricing field: %w", err)
  469. }
  470. } else {
  471. return fmt.Errorf("type error while updating config for %s", kUpper)
  472. }
  473. }
  474. }
  475. if env.IsRemoteEnabled() {
  476. err := utils.UpdateClusterMeta(coreenv.GetClusterID(), c.ClusterName)
  477. if err != nil {
  478. return err
  479. }
  480. }
  481. return nil
  482. }
  483. }
  484. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*models.CustomPricing, error) {
  485. return aws.Config.Update(configUpdaterWithReaderAndType(r, updateType))
  486. }
  487. type awsKey struct {
  488. Name string
  489. SpotLabelName string
  490. SpotLabelValue string
  491. Labels map[string]string
  492. ProviderID string
  493. }
  494. func (k *awsKey) GPUCount() int {
  495. return 0
  496. }
  497. func (k *awsKey) GPUType() string {
  498. return ""
  499. }
  500. func (k *awsKey) ID() string {
  501. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  502. if matchNum == 2 {
  503. return group
  504. }
  505. }
  506. log.Warnf("Could not find instance ID in \"%s\"", k.ProviderID)
  507. return ""
  508. }
  509. // Features will return a comma separated list of features for the given node
  510. // If the node has a spot label, it will be included in the list
  511. // Otherwise, the list include instance type, operating system, and the region
  512. func (k *awsKey) Features() string {
  513. instanceType, _ := util.GetInstanceType(k.Labels)
  514. operatingSystem, _ := util.GetOperatingSystem(k.Labels)
  515. region, _ := util.GetRegion(k.Labels)
  516. key := region + "," + instanceType + "," + operatingSystem
  517. usageType := k.getUsageType(k.Labels)
  518. spotKey := key + "," + usageType
  519. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  520. return spotKey
  521. }
  522. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  523. return spotKey
  524. }
  525. if usageType == PreemptibleType {
  526. return spotKey
  527. }
  528. return key
  529. }
  530. const eksComputeTypeLabel = "eks.amazonaws.com/compute-type"
  531. func (k *awsKey) isFargateNode() bool {
  532. v := k.Labels[eksComputeTypeLabel]
  533. if v == "fargate" {
  534. return true
  535. }
  536. return false
  537. }
  538. // getUsageType returns the usage type of the instance
  539. // If the instance is a spot instance, it will return PreemptibleType
  540. // Otherwise returns an empty string
  541. func (k *awsKey) getUsageType(labels map[string]string) string {
  542. if kLabel, ok := labels[k.SpotLabelName]; ok && kLabel == k.SpotLabelValue {
  543. return PreemptibleType
  544. }
  545. if eksLabel, ok := labels[EKSCapacityTypeLabel]; ok && eksLabel == EKSCapacitySpotTypeValue {
  546. // We currently write out spot instances as "preemptible" in the pricing data, so these need to match
  547. return PreemptibleType
  548. }
  549. if kLabel, ok := labels[models.KarpenterCapacityTypeLabel]; ok && kLabel == models.KarpenterCapacitySpotTypeValue {
  550. return PreemptibleType
  551. }
  552. return ""
  553. }
  554. func (awsProvider *AWS) GpuPricing(nodeLabels map[string]string) (string, error) {
  555. return "", nil
  556. }
  557. func (aws *AWS) PVPricing(pvk models.PVKey) (*models.PV, error) {
  558. pricing, ok := aws.Pricing[pvk.Features()]
  559. if !ok {
  560. log.Debugf("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  561. return &models.PV{}, nil
  562. }
  563. return pricing.PV, nil
  564. }
  565. type awsPVKey struct {
  566. Labels map[string]string
  567. StorageClassParameters map[string]string
  568. StorageClassName string
  569. Name string
  570. DefaultRegion string
  571. ProviderID string
  572. }
  573. func (aws *AWS) GetPVKey(pv *clustercache.PersistentVolume, parameters map[string]string, defaultRegion string) models.PVKey {
  574. providerID := ""
  575. if pv.Spec.AWSElasticBlockStore != nil {
  576. providerID = pv.Spec.AWSElasticBlockStore.VolumeID
  577. } else if pv.Spec.CSI != nil {
  578. providerID = pv.Spec.CSI.VolumeHandle
  579. }
  580. return &awsPVKey{
  581. Labels: pv.Labels,
  582. StorageClassName: pv.Spec.StorageClassName,
  583. StorageClassParameters: parameters,
  584. Name: pv.Name,
  585. DefaultRegion: defaultRegion,
  586. ProviderID: providerID,
  587. }
  588. }
  589. func (key *awsPVKey) ID() string {
  590. return key.ProviderID
  591. }
  592. func (key *awsPVKey) GetStorageClass() string {
  593. return key.StorageClassName
  594. }
  595. func (key *awsPVKey) Features() string {
  596. storageClass, ok := key.StorageClassParameters["type"]
  597. if !ok {
  598. log.Debugf("storage class %s doesn't have a 'type' parameter", key.Name)
  599. storageClass = getStorageClassTypeFrom(key.StorageClassParameters["provisioner"])
  600. }
  601. if storageClass == "standard" {
  602. storageClass = "gp2"
  603. }
  604. // Storage class names are generally EBS volume types (gp2)
  605. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  606. // Converts between the 2
  607. region, ok := util.GetRegion(key.Labels)
  608. if !ok {
  609. region = key.DefaultRegion
  610. }
  611. class, ok := volTypes[storageClass]
  612. if !ok {
  613. log.Debugf("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  614. }
  615. return region + "," + class
  616. }
  617. // getStorageClassTypeFrom returns the default ebs volume type for a provider provisioner
  618. func getStorageClassTypeFrom(provisioner string) string {
  619. // if there isn't any provided provisioner, return empty volume type
  620. if provisioner == "" {
  621. return ""
  622. }
  623. scType, ok := StorageClassProvisionerDefaults[provisioner]
  624. if ok {
  625. log.Debugf("using default voltype %s for provisioner %s", scType, provisioner)
  626. return scType
  627. }
  628. return ""
  629. }
  630. // GetKey maps node labels to information needed to retrieve pricing data
  631. func (aws *AWS) GetKey(labels map[string]string, n *clustercache.Node) models.Key {
  632. return &awsKey{
  633. Name: n.Name,
  634. SpotLabelName: aws.SpotLabelName,
  635. SpotLabelValue: aws.SpotLabelValue,
  636. Labels: labels,
  637. ProviderID: labels["providerID"],
  638. }
  639. }
  640. func (aws *AWS) isPreemptible(key string) bool {
  641. s := strings.Split(key, ",")
  642. if len(s) == 4 && s[3] == PreemptibleType {
  643. return true
  644. }
  645. return false
  646. }
  647. func (aws *AWS) ClusterManagementPricing() (string, float64, error) {
  648. return aws.clusterProvisioner, aws.clusterManagementPrice, nil
  649. }
  650. func getPricingListURL(serviceCode string, nodeList []*clustercache.Node) string {
  651. // See https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/using-the-aws-price-list-bulk-api-fetching-price-list-files-manually.html
  652. region := ""
  653. multiregion := false
  654. isChina := false
  655. for _, n := range nodeList {
  656. r, ok := util.GetRegion(n.Labels)
  657. if !ok {
  658. multiregion = true
  659. break
  660. }
  661. if strings.HasPrefix(r, chinaRegionPrefix) {
  662. isChina = true
  663. }
  664. if region == "" {
  665. region = r
  666. } else if r != region {
  667. multiregion = true
  668. break
  669. }
  670. }
  671. baseURL := awsPricingBaseURL + serviceCode + pricingCurrentPath
  672. if isChina {
  673. // Chinese regions are isolated and use a different pricing endpoint
  674. baseURL = awsChinaPricingBaseURL + serviceCode + pricingCurrentPath
  675. }
  676. if region != "" && !multiregion {
  677. baseURL += region + "/"
  678. }
  679. return baseURL + pricingIndexFile
  680. }
  681. // Use the pricing data from the current region. Fall back to using all region data if needed.
  682. func (aws *AWS) getRegionPricing(nodeList []*clustercache.Node) (*http.Response, string, error) {
  683. var pricingURL string
  684. if env.GetAWSPricingURL() != "" { // Allow override of pricing URL
  685. pricingURL = env.GetAWSPricingURL()
  686. } else {
  687. pricingURL = getPricingListURL("AmazonEC2", nodeList)
  688. }
  689. log.Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  690. resp, err := http.Get(pricingURL)
  691. if err != nil {
  692. log.Errorf("Bogus fetch of \"%s\": %v", pricingURL, err)
  693. return nil, pricingURL, err
  694. }
  695. return resp, pricingURL, err
  696. }
  697. // SpotFeedRefreshEnabled determines whether the required configs to run the spot feed query have been set up
  698. func (aws *AWS) SpotFeedRefreshEnabled() bool {
  699. // Guard against nil receiver
  700. if aws == nil {
  701. return false
  702. }
  703. // Fallback if config is not initialized
  704. if aws.Config == nil {
  705. return len(aws.SpotDataBucket) != 0 ||
  706. len(aws.SpotDataRegion) != 0 ||
  707. len(aws.ProjectID) != 0
  708. }
  709. // Check if spot data feed is explicitly disabled via config
  710. c, err := aws.Config.GetCustomPricingData()
  711. if err == nil && c.SpotDataFeedEnabled == "false" {
  712. return false
  713. }
  714. // Default behavior
  715. return len(aws.SpotDataBucket) != 0 ||
  716. len(aws.SpotDataRegion) != 0 ||
  717. len(aws.ProjectID) != 0
  718. }
  719. // DownloadPricingData fetches data from the AWS Pricing API
  720. func (aws *AWS) DownloadPricingData() error {
  721. aws.DownloadPricingDataLock.Lock()
  722. defer aws.DownloadPricingDataLock.Unlock()
  723. c, err := aws.Config.GetCustomPricingData()
  724. if err != nil {
  725. log.Errorf("Error downloading default pricing data: %s", err.Error())
  726. }
  727. aws.BaseCPUPrice = c.CPU
  728. aws.BaseRAMPrice = c.RAM
  729. aws.BaseGPUPrice = c.GPU
  730. aws.BaseSpotCPUPrice = c.SpotCPU
  731. aws.BaseSpotRAMPrice = c.SpotRAM
  732. aws.BaseSpotGPUPrice = c.SpotGPU
  733. aws.SpotLabelName = c.SpotLabel
  734. aws.SpotLabelValue = c.SpotLabelValue
  735. aws.SpotDataBucket = c.AwsSpotDataBucket
  736. aws.SpotDataPrefix = c.AwsSpotDataPrefix
  737. aws.ProjectID = c.ProjectID
  738. aws.SpotDataRegion = c.AwsSpotDataRegion
  739. aws.ConfigureAuthWith(c) // load aws authentication from configuration or secret
  740. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  741. log.Warnf("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  742. }
  743. nodeList := aws.Clientset.GetAllNodes()
  744. inputkeys := make(map[string]bool)
  745. for _, n := range nodeList {
  746. if _, ok := n.Labels["eks.amazonaws.com/nodegroup"]; ok {
  747. aws.clusterManagementPrice = 0.10
  748. aws.clusterProvisioner = "EKS"
  749. } else if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  750. aws.clusterProvisioner = "KOPS"
  751. }
  752. labels := n.Labels
  753. key := aws.GetKey(labels, n)
  754. inputkeys[key.Features()] = true
  755. }
  756. pvList := aws.Clientset.GetAllPersistentVolumes()
  757. storageClasses := aws.Clientset.GetAllStorageClasses()
  758. storageClassMap := make(map[string]map[string]string)
  759. for _, storageClass := range storageClasses {
  760. params := storageClass.Parameters
  761. if params != nil {
  762. params["provisioner"] = storageClass.Provisioner
  763. }
  764. storageClassMap[storageClass.Name] = params
  765. if storageClass.Annotations["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.Annotations["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  766. storageClassMap["default"] = params
  767. storageClassMap[""] = params
  768. }
  769. }
  770. pvkeys := make(map[string]models.PVKey)
  771. for _, pv := range pvList {
  772. params, ok := storageClassMap[pv.Spec.StorageClassName]
  773. if !ok {
  774. log.Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  775. continue
  776. }
  777. key := aws.GetPVKey(pv, params, "")
  778. pvkeys[key.Features()] = key
  779. }
  780. // RIDataRunning establishes the existence of the goroutine. Since it's possible we
  781. // run multiple downloads, we don't want to create multiple go routines if one already exists
  782. //
  783. // If athenaBucketName is unconfigured, the ReservedInstanceData and SavingsPlanData watchers
  784. // are skipped. Note: These watchers are less commonly used. It is recommended to use the full
  785. // CloudCosts feature via athenaintegration.go.
  786. if !aws.RIDataRunning {
  787. err = aws.GetReservationDataFromAthena() // Block until one run has completed.
  788. if err != nil {
  789. if errors.Is(err, ErrNoAthenaBucket) {
  790. log.Debugf("No \"athenaBucketName\" configured, ReservedInstanceData watcher will not run")
  791. } else {
  792. log.Warnf("Failed to lookup reserved instance data: %s", err.Error())
  793. }
  794. } else { // If we make one successful run, check on new reservation data every hour
  795. go func() {
  796. defer errs.HandlePanic()
  797. aws.RIDataRunning = true
  798. for {
  799. log.Infof("Reserved Instance watcher running... next update in 1h")
  800. time.Sleep(time.Hour)
  801. err := aws.GetReservationDataFromAthena()
  802. if err != nil {
  803. log.Infof("Error updating RI data: %s", err.Error())
  804. }
  805. }
  806. }()
  807. }
  808. }
  809. if !aws.SavingsPlanDataRunning {
  810. err = aws.GetSavingsPlanDataFromAthena()
  811. if err != nil {
  812. if errors.Is(err, ErrNoAthenaBucket) {
  813. log.Debugf("No \"athenaBucketName\" configured, SavingsPlanData watcher will not run")
  814. } else {
  815. log.Errorf("Failed to lookup savings plan data: %s", err.Error())
  816. }
  817. } else {
  818. go func() {
  819. defer errs.HandlePanic()
  820. aws.SavingsPlanDataRunning = true
  821. for {
  822. log.Infof("Savings Plan watcher running... next update in 1h")
  823. time.Sleep(time.Hour)
  824. err := aws.GetSavingsPlanDataFromAthena()
  825. if err != nil {
  826. log.Infof("Error updating Savings Plan data: %s", err.Error())
  827. }
  828. }
  829. }()
  830. }
  831. }
  832. // Initialize fargate pricing if it's not initialized yet
  833. if aws.FargatePricing == nil {
  834. aws.FargatePricing = NewFargatePricing()
  835. aws.FargatePricingError = aws.FargatePricing.Initialize(nodeList)
  836. if aws.FargatePricingError != nil {
  837. log.Errorf("Failed to initialize fargate pricing: %s", aws.FargatePricingError.Error())
  838. }
  839. }
  840. aws.ValidPricingKeys = make(map[string]bool)
  841. resp, pricingURL, err := aws.getRegionPricing(nodeList)
  842. if err != nil {
  843. return err
  844. }
  845. err = aws.populatePricing(resp, inputkeys)
  846. if err != nil {
  847. return err
  848. }
  849. log.Infof("Finished downloading \"%s\"", pricingURL)
  850. // Initialize a spot price history cache if not already initialized.
  851. // Reset error to allow retrying on subsequent DownloadPricingData calls.
  852. if aws.SpotPriceHistoryCache == nil {
  853. aws.SpotPriceHistoryError = nil
  854. aws.SpotPriceHistoryCache, aws.SpotPriceHistoryError = aws.initializeSpotPriceHistoryCache()
  855. if aws.SpotPriceHistoryError != nil {
  856. log.Errorf("Failed to initialize spot price history manager: %v", aws.SpotPriceHistoryError)
  857. }
  858. }
  859. if aws.SpotFeedRefreshEnabled() {
  860. // Always run spot pricing refresh when performing download
  861. aws.refreshSpotPricing(true)
  862. // Only start a single refresh goroutine
  863. if !aws.SpotRefreshRunning {
  864. aws.SpotRefreshRunning = true
  865. go func() {
  866. defer errs.HandlePanic()
  867. for {
  868. log.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
  869. time.Sleep(SpotRefreshDuration)
  870. // Reoccurring refresh checks update times
  871. aws.refreshSpotPricing(false)
  872. }
  873. }()
  874. }
  875. }
  876. return nil
  877. }
  878. func (aws *AWS) populatePricing(resp *http.Response, inputkeys map[string]bool) error {
  879. aws.Pricing = make(map[string]*AWSProductTerms)
  880. skusToKeys := make(map[string]string)
  881. dec := json.NewDecoder(resp.Body)
  882. for {
  883. t, err := dec.Token()
  884. if err == io.EOF {
  885. log.Infof("done loading \"%s\"\n", resp.Request.URL.String())
  886. break
  887. } else if err != nil {
  888. log.Errorf("error parsing response json %v", resp.Body)
  889. break
  890. }
  891. if t == "products" {
  892. _, err := dec.Token() // this should parse the opening "{""
  893. if err != nil {
  894. return err
  895. }
  896. for dec.More() {
  897. _, err := dec.Token() // the sku token
  898. if err != nil {
  899. return err
  900. }
  901. product := &PriceListEC2Product{}
  902. err = dec.Decode(&product)
  903. if err != nil {
  904. log.Errorf("Error parsing response from \"%s\": %v", resp.Request.URL.String(), err.Error())
  905. break
  906. }
  907. if product.Attributes.PreInstalledSw == "NA" &&
  908. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) &&
  909. product.Attributes.CapacityStatus == "Used" &&
  910. product.Attributes.MarketOption == "OnDemand" {
  911. key := aws.KubeAttrConversion(product.Attributes.RegionCode, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  912. spotKey := key + ",preemptible"
  913. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  914. productTerms := &AWSProductTerms{
  915. Sku: product.Sku,
  916. Memory: product.Attributes.Memory,
  917. Storage: product.Attributes.Storage,
  918. VCpu: product.Attributes.VCpu,
  919. GPU: product.Attributes.GPU,
  920. }
  921. aws.Pricing[key] = productTerms
  922. aws.Pricing[spotKey] = productTerms
  923. skusToKeys[product.Sku] = key
  924. }
  925. aws.ValidPricingKeys[key] = true
  926. aws.ValidPricingKeys[spotKey] = true
  927. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  928. // UsageTypes may be prefixed with a region code - we're removing this when using
  929. // volTypes to keep lookups generic
  930. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  931. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  932. key := product.Attributes.RegionCode + "," + usageTypeNoRegion
  933. spotKey := key + ",preemptible"
  934. pv := &models.PV{
  935. Class: volTypes[usageTypeNoRegion],
  936. Region: product.Attributes.RegionCode,
  937. }
  938. productTerms := &AWSProductTerms{
  939. Sku: product.Sku,
  940. PV: pv,
  941. }
  942. aws.Pricing[key] = productTerms
  943. aws.Pricing[spotKey] = productTerms
  944. skusToKeys[product.Sku] = key
  945. aws.ValidPricingKeys[key] = true
  946. aws.ValidPricingKeys[spotKey] = true
  947. } else if strings.Contains(product.Attributes.UsageType, "LoadBalancerUsage") && product.Attributes.Operation == "LoadBalancing:Network" {
  948. // since the costmodel is only using services of type LoadBalancer
  949. // (and not ingresses controlled by AWS load balancer controller)
  950. // we can safely filter for Network load balancers only
  951. productTerms := &AWSProductTerms{
  952. Sku: product.Sku,
  953. LoadBalancer: &models.LoadBalancer{},
  954. }
  955. // there is no spot pricing for load balancers
  956. key := product.Attributes.RegionCode + ",LoadBalancerUsage"
  957. aws.Pricing[key] = productTerms
  958. skusToKeys[product.Sku] = key
  959. aws.ValidPricingKeys[key] = true
  960. }
  961. }
  962. }
  963. if t == "terms" {
  964. _, err := dec.Token() // this should parse the opening "{""
  965. if err != nil {
  966. return err
  967. }
  968. termType, err := dec.Token()
  969. if err != nil {
  970. return err
  971. }
  972. if termType == "OnDemand" {
  973. _, err := dec.Token()
  974. if err != nil { // again, should parse an opening "{"
  975. return err
  976. }
  977. for dec.More() {
  978. sku, err := dec.Token()
  979. if err != nil {
  980. return err
  981. }
  982. _, err = dec.Token() // another opening "{"
  983. if err != nil {
  984. return err
  985. }
  986. // SKUOndemand
  987. _, err = dec.Token()
  988. if err != nil {
  989. return err
  990. }
  991. offerTerm := &PriceListEC2Term{}
  992. err = dec.Decode(&offerTerm)
  993. if err != nil {
  994. log.Errorf("Error decoding AWS Offer Term: %s", err.Error())
  995. }
  996. key, ok := skusToKeys[sku.(string)]
  997. spotKey := key + ",preemptible"
  998. if ok {
  999. aws.Pricing[key].OnDemand = offerTerm
  1000. if _, ok := aws.Pricing[spotKey]; ok {
  1001. aws.Pricing[spotKey].OnDemand = offerTerm
  1002. }
  1003. var cost string
  1004. if _, isMatch := OnDemandRateCodes[offerTerm.OfferTermCode]; isMatch {
  1005. priceDimensionKey := strings.Join([]string{sku.(string), offerTerm.OfferTermCode, HourlyRateCode}, ".")
  1006. dimension, ok := offerTerm.PriceDimensions[priceDimensionKey]
  1007. if ok {
  1008. cost = dimension.PricePerUnit.USD
  1009. } else {
  1010. // this is an edge case seen in AWS CN pricing files, including here just in case
  1011. // if there is only one dimension, use it, even if the key is incorrect, otherwise assume defaults
  1012. if len(offerTerm.PriceDimensions) == 1 {
  1013. for key, backupDimension := range offerTerm.PriceDimensions {
  1014. cost = backupDimension.PricePerUnit.USD
  1015. log.DedupedWarningf(5, "using:%s for a price dimension instead of missing dimension: %s", offerTerm.PriceDimensions[key], priceDimensionKey)
  1016. break
  1017. }
  1018. } else if len(offerTerm.PriceDimensions) == 0 {
  1019. log.DedupedWarningf(5, "populatePricing: no pricing dimension available for: %s.", priceDimensionKey)
  1020. } else {
  1021. log.DedupedWarningf(5, "populatePricing: no assumable pricing dimension available for: %s.", priceDimensionKey)
  1022. }
  1023. }
  1024. } else if _, isMatch := OnDemandRateCodesCn[offerTerm.OfferTermCode]; isMatch {
  1025. priceDimensionKey := strings.Join([]string{sku.(string), offerTerm.OfferTermCode, HourlyRateCodeCn}, ".")
  1026. dimension, ok := offerTerm.PriceDimensions[priceDimensionKey]
  1027. if ok {
  1028. cost = dimension.PricePerUnit.CNY
  1029. } else {
  1030. // fall through logic for handling inconsistencies in AWS CN pricing files
  1031. // if there is only one dimension, use it, even if the key is incorrect, otherwise assume defaults
  1032. if len(offerTerm.PriceDimensions) == 1 {
  1033. for key, backupDimension := range offerTerm.PriceDimensions {
  1034. cost = backupDimension.PricePerUnit.CNY
  1035. log.DedupedWarningf(5, "using:%s for a price dimension instead of missing dimension: %s", offerTerm.PriceDimensions[key], priceDimensionKey)
  1036. break
  1037. }
  1038. } else if len(offerTerm.PriceDimensions) == 0 {
  1039. log.DedupedWarningf(5, "populatePricing: no pricing dimension available for: %s.", priceDimensionKey)
  1040. } else {
  1041. log.DedupedWarningf(5, "populatePricing: no assumable pricing dimension available for: %s.", priceDimensionKey)
  1042. }
  1043. }
  1044. }
  1045. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  1046. // If the specific UsageType is the per IO cost used on io1 volumes
  1047. // we need to add the per IO cost to the io1 PV cost
  1048. // Add the per IO cost to the PV object for the io1 volume type
  1049. aws.Pricing[key].PV.CostPerIO = cost
  1050. } else if strings.Contains(key, "EBS:Volume") {
  1051. // If volume, we need to get hourly cost and add it to the PV object
  1052. costFloat, _ := strconv.ParseFloat(cost, 64)
  1053. hourlyPrice := costFloat / 730
  1054. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  1055. } else if strings.Contains(key, "LoadBalancerUsage") {
  1056. costFloat, err := strconv.ParseFloat(cost, 64)
  1057. if err != nil {
  1058. return err
  1059. }
  1060. aws.Pricing[key].LoadBalancer.Cost = costFloat
  1061. }
  1062. }
  1063. _, err = dec.Token()
  1064. if err != nil {
  1065. return err
  1066. }
  1067. }
  1068. _, err = dec.Token()
  1069. if err != nil {
  1070. return err
  1071. }
  1072. }
  1073. }
  1074. }
  1075. return nil
  1076. }
  1077. func (aws *AWS) refreshSpotPricing(force bool) {
  1078. aws.SpotPricingLock.Lock()
  1079. defer aws.SpotPricingLock.Unlock()
  1080. now := time.Now().UTC()
  1081. updateTime := now.Add(-SpotRefreshDuration)
  1082. // Return if there was an update time set and an hour hasn't elapsed
  1083. if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
  1084. return
  1085. }
  1086. sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion)
  1087. if err != nil {
  1088. log.Warnf("Skipping AWS spot data download: %s", err.Error())
  1089. aws.SpotPricingError = err
  1090. return
  1091. }
  1092. aws.SpotPricingError = nil
  1093. // update time last updated
  1094. aws.SpotPricingUpdatedAt = &now
  1095. aws.SpotPricingByInstanceID = sp
  1096. }
  1097. func (aws *AWS) initializeSpotPriceHistoryCache() (*SpotPriceHistoryCache, error) {
  1098. log.Info("Initializing AWS Spot Price History Manager")
  1099. // Get AWS access key for creating config
  1100. accessKey, err := aws.GetAWSAccessKey()
  1101. if err != nil {
  1102. return nil, fmt.Errorf("getting AWS access key for spot price history: %w", err)
  1103. }
  1104. // Use the cluster region to create the initial AWS config and credentials.
  1105. // The SpotPriceHistoryFetcher itself can query multiple regions by creating
  1106. // region-specific EC2 clients as needed.
  1107. if aws.ClusterRegion == "" {
  1108. return nil, fmt.Errorf("no cluster region configured")
  1109. }
  1110. // Create config for the cluster region
  1111. awsConfig, err := accessKey.CreateConfig(aws.ClusterRegion)
  1112. if err != nil {
  1113. return nil, fmt.Errorf("creating AWS config for spot price history: %w", err)
  1114. }
  1115. return NewSpotPriceHistoryCache(NewAWSSpotPriceHistoryFetcher(awsConfig)), nil
  1116. }
  1117. func (aws *AWS) spotPricingFromHistory(k models.Key) (*SpotPriceHistoryEntry, bool) {
  1118. if aws.SpotPriceHistoryCache == nil {
  1119. return nil, false
  1120. }
  1121. // Extract region, instance type, and availability zone from the key
  1122. awsKey, ok := k.(*awsKey)
  1123. if !ok {
  1124. log.DedupedWarningf(10, "Failed to cast key to awsKey for spot price history lookup: %s", k.ID())
  1125. return nil, false
  1126. }
  1127. region, regionOk := util.GetRegion(awsKey.Labels)
  1128. instanceType, instanceTypeOk := util.GetInstanceType(awsKey.Labels)
  1129. availabilityZone, availabilityZoneOk := util.GetZone(awsKey.Labels)
  1130. // Skip lookup if any required information is missing
  1131. if !regionOk || !instanceTypeOk || !availabilityZoneOk {
  1132. log.DedupedWarningf(10, "Missing required info for spot price history lookup (region: %s, instanceType: %s, zone: %s): %s", region, instanceType, availabilityZone, k.ID())
  1133. return nil, false
  1134. }
  1135. price, err := aws.SpotPriceHistoryCache.GetSpotPrice(region, instanceType, availabilityZone)
  1136. if err != nil {
  1137. log.DedupedWarningf(10, "Failed to get spot price history for instance %s: %s", k.ID(), err.Error())
  1138. return nil, false
  1139. }
  1140. return price, true
  1141. }
  1142. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  1143. func (aws *AWS) NetworkPricing() (*models.Network, error) {
  1144. cpricing, err := aws.Config.GetCustomPricingData()
  1145. if err != nil {
  1146. return nil, err
  1147. }
  1148. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  1149. if err != nil {
  1150. return nil, err
  1151. }
  1152. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  1153. if err != nil {
  1154. return nil, err
  1155. }
  1156. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  1157. if err != nil {
  1158. return nil, err
  1159. }
  1160. nge, err := strconv.ParseFloat(cpricing.NatGatewayEgress, 64)
  1161. if err != nil {
  1162. return nil, err
  1163. }
  1164. ngi, err := strconv.ParseFloat(cpricing.NatGatewayIngress, 64)
  1165. if err != nil {
  1166. return nil, err
  1167. }
  1168. return &models.Network{
  1169. ZoneNetworkEgressCost: znec,
  1170. RegionNetworkEgressCost: rnec,
  1171. InternetNetworkEgressCost: inec,
  1172. NatGatewayEgressCost: nge,
  1173. NatGatewayIngressCost: ngi,
  1174. }, nil
  1175. }
  1176. func (aws *AWS) LoadBalancerPricing() (*models.LoadBalancer, error) {
  1177. // TODO: determine key based on function arguments
  1178. // this is something that should be changed in the Provider interface
  1179. key := aws.ClusterRegion + ",LoadBalancerUsage"
  1180. // set default price
  1181. hourlyCost := 0.025
  1182. // use price index when available
  1183. if terms, ok := aws.Pricing[key]; ok {
  1184. hourlyCost = terms.LoadBalancer.Cost
  1185. }
  1186. return &models.LoadBalancer{
  1187. Cost: hourlyCost,
  1188. }, nil
  1189. }
  1190. // AllNodePricing returns all the billing data fetched.
  1191. func (aws *AWS) AllNodePricing() (interface{}, error) {
  1192. aws.DownloadPricingDataLock.RLock()
  1193. defer aws.DownloadPricingDataLock.RUnlock()
  1194. return aws.Pricing, nil
  1195. }
  1196. func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
  1197. aws.SpotPricingLock.RLock()
  1198. defer aws.SpotPricingLock.RUnlock()
  1199. info, ok := aws.SpotPricingByInstanceID[instanceID]
  1200. return info, ok
  1201. }
  1202. func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
  1203. aws.RIDataLock.RLock()
  1204. defer aws.RIDataLock.RUnlock()
  1205. data, ok := aws.RIPricingByInstanceID[instanceID]
  1206. return data, ok
  1207. }
  1208. func (aws *AWS) savingsPlanPricing(instanceID string) (*SavingsPlanData, bool) {
  1209. aws.SavingsPlanDataLock.RLock()
  1210. defer aws.SavingsPlanDataLock.RUnlock()
  1211. data, ok := aws.SavingsPlanDataByInstanceID[instanceID]
  1212. return data, ok
  1213. }
  1214. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k models.Key) (*models.Node, models.PricingMetadata, error) {
  1215. key := k.Features()
  1216. meta := models.PricingMetadata{}
  1217. var cost string
  1218. publicPricingFound := true
  1219. c, ok := terms.OnDemand.PriceDimensions[strings.Join([]string{terms.Sku, terms.OnDemand.OfferTermCode, HourlyRateCode}, ".")]
  1220. if ok {
  1221. cost = c.PricePerUnit.USD
  1222. } else {
  1223. // Check for Chinese pricing
  1224. c, ok = terms.OnDemand.PriceDimensions[strings.Join([]string{terms.Sku, terms.OnDemand.OfferTermCode, HourlyRateCodeCn}, ".")]
  1225. if ok {
  1226. cost = c.PricePerUnit.CNY
  1227. } else {
  1228. publicPricingFound = false
  1229. }
  1230. }
  1231. if spotInfo, ok := aws.spotPricing(k.ID()); ok {
  1232. var spotcost string
  1233. log.DedupedInfof(5, "Looking up spot data from feed for node %s", k.ID())
  1234. arr := strings.Split(spotInfo.Charge, " ")
  1235. if len(arr) == 2 {
  1236. spotcost = arr[0]
  1237. } else {
  1238. log.Infof("Spot data for node %s is missing", k.ID())
  1239. }
  1240. return &models.Node{
  1241. Cost: spotcost,
  1242. VCPU: terms.VCpu,
  1243. RAM: terms.Memory,
  1244. GPU: terms.GPU,
  1245. Storage: terms.Storage,
  1246. BaseCPUPrice: aws.BaseCPUPrice,
  1247. BaseRAMPrice: aws.BaseRAMPrice,
  1248. BaseGPUPrice: aws.BaseGPUPrice,
  1249. UsageType: PreemptibleType,
  1250. }, meta, nil
  1251. } else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
  1252. log.DedupedWarningf(5, "Node %s marked preemptible but no spot feed data available; falling back to other pricing sources", k.ID())
  1253. // Try to get spot pricing from DescribeSpotPriceHistory API
  1254. if historyEntry, ok := aws.spotPricingFromHistory(k); ok {
  1255. log.DedupedInfof(5, "Using spot price history data for node %s: $%f", k.ID(), historyEntry.SpotPrice)
  1256. spotHistoryCost := fmt.Sprintf("%f", historyEntry.SpotPrice)
  1257. meta.Source = SpotPriceHistorySource
  1258. return &models.Node{
  1259. Cost: spotHistoryCost,
  1260. VCPU: terms.VCpu,
  1261. RAM: terms.Memory,
  1262. GPU: terms.GPU,
  1263. Storage: terms.Storage,
  1264. BaseCPUPrice: aws.BaseCPUPrice,
  1265. BaseRAMPrice: aws.BaseRAMPrice,
  1266. BaseGPUPrice: aws.BaseGPUPrice,
  1267. UsageType: PreemptibleType,
  1268. }, meta, nil
  1269. }
  1270. if publicPricingFound {
  1271. // return public price if found
  1272. log.DedupedWarningf(5, "No spot price history available for %s, falling back to on-demand pricing", k.ID())
  1273. return &models.Node{
  1274. Cost: cost,
  1275. VCPU: terms.VCpu,
  1276. RAM: terms.Memory,
  1277. GPU: terms.GPU,
  1278. Storage: terms.Storage,
  1279. BaseCPUPrice: aws.BaseCPUPrice,
  1280. BaseRAMPrice: aws.BaseRAMPrice,
  1281. BaseGPUPrice: aws.BaseGPUPrice,
  1282. UsageType: PreemptibleType,
  1283. }, meta, nil
  1284. } else {
  1285. // return defaults if public pricing not found
  1286. log.DedupedWarningf(5, "Could not find Node %s's public pricing info, using default configured spot prices instead", k.ID())
  1287. return &models.Node{
  1288. VCPU: terms.VCpu,
  1289. VCPUCost: aws.BaseSpotCPUPrice,
  1290. RAMCost: aws.BaseSpotRAMPrice,
  1291. RAM: terms.Memory,
  1292. GPU: terms.GPU,
  1293. Storage: terms.Storage,
  1294. BaseCPUPrice: aws.BaseCPUPrice,
  1295. BaseRAMPrice: aws.BaseRAMPrice,
  1296. BaseGPUPrice: aws.BaseGPUPrice,
  1297. UsageType: PreemptibleType,
  1298. }, meta, nil
  1299. }
  1300. } else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
  1301. strCost := fmt.Sprintf("%f", sp.EffectiveCost)
  1302. return &models.Node{
  1303. Cost: strCost,
  1304. VCPU: terms.VCpu,
  1305. RAM: terms.Memory,
  1306. GPU: terms.GPU,
  1307. Storage: terms.Storage,
  1308. BaseCPUPrice: aws.BaseCPUPrice,
  1309. BaseRAMPrice: aws.BaseRAMPrice,
  1310. BaseGPUPrice: aws.BaseGPUPrice,
  1311. UsageType: usageType,
  1312. }, meta, nil
  1313. } else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
  1314. strCost := fmt.Sprintf("%f", ri.EffectiveCost)
  1315. return &models.Node{
  1316. Cost: strCost,
  1317. VCPU: terms.VCpu,
  1318. RAM: terms.Memory,
  1319. GPU: terms.GPU,
  1320. Storage: terms.Storage,
  1321. BaseCPUPrice: aws.BaseCPUPrice,
  1322. BaseRAMPrice: aws.BaseRAMPrice,
  1323. BaseGPUPrice: aws.BaseGPUPrice,
  1324. UsageType: usageType,
  1325. }, meta, nil
  1326. }
  1327. // Throw error if public price is not found
  1328. if !publicPricingFound {
  1329. return nil, meta, fmt.Errorf("for node \"%s\", cannot find the following key in OnDemand pricing data \"%s\"", k.ID(), k.Features())
  1330. }
  1331. return &models.Node{
  1332. Cost: cost,
  1333. VCPU: terms.VCpu,
  1334. RAM: terms.Memory,
  1335. GPU: terms.GPU,
  1336. Storage: terms.Storage,
  1337. BaseCPUPrice: aws.BaseCPUPrice,
  1338. BaseRAMPrice: aws.BaseRAMPrice,
  1339. BaseGPUPrice: aws.BaseGPUPrice,
  1340. UsageType: usageType,
  1341. }, meta, nil
  1342. }
  1343. func (aws *AWS) getFargatePod(awsKey *awsKey) (*clustercache.Pod, bool) {
  1344. pods := aws.Clientset.GetAllPods()
  1345. for _, pod := range pods {
  1346. if pod.Spec.NodeName == awsKey.Name {
  1347. return pod, true
  1348. }
  1349. }
  1350. return nil, false
  1351. }
  1352. const (
  1353. nodeOSLabel = "kubernetes.io/os"
  1354. nodeArchLabel = "kubernetes.io/arch"
  1355. fargatePodCapacityAnnotation = "CapacityProvisioned"
  1356. )
  1357. // e.g. "0.25vCPU 0.5GB"
  1358. var fargatePodCapacityRegex = regexp.MustCompile("^([0-9.]+)vCPU ([0-9.]+)GB$")
  1359. func (aws *AWS) createFargateNode(awsKey *awsKey, usageType string) (*models.Node, models.PricingMetadata, error) {
  1360. if aws.FargatePricing == nil {
  1361. return nil, models.PricingMetadata{}, fmt.Errorf("fargate pricing not initialized")
  1362. }
  1363. pod, ok := aws.getFargatePod(awsKey)
  1364. if !ok {
  1365. return nil, models.PricingMetadata{}, fmt.Errorf("could not find pod for fargate node %s", awsKey.Name)
  1366. }
  1367. capacity := pod.Annotations[fargatePodCapacityAnnotation]
  1368. match := fargatePodCapacityRegex.FindStringSubmatch(capacity)
  1369. if len(match) == 0 {
  1370. return nil, models.PricingMetadata{}, fmt.Errorf("could not parse pod capacity for fargate node %s", awsKey.Name)
  1371. }
  1372. vCPU, err := strconv.ParseFloat(match[1], 64)
  1373. if err != nil {
  1374. return nil, models.PricingMetadata{}, fmt.Errorf("could not parse vCPU capacity for fargate node %s: %v", awsKey.Name, err)
  1375. }
  1376. memory, err := strconv.ParseFloat(match[2], 64)
  1377. if err != nil {
  1378. return nil, models.PricingMetadata{}, fmt.Errorf("could not parse memory capacity for fargate node %s: %v", awsKey.Name, err)
  1379. }
  1380. region, ok := util.GetRegion(awsKey.Labels)
  1381. if !ok {
  1382. return nil, models.PricingMetadata{}, fmt.Errorf("could not get region for fargate node %s", awsKey.Name)
  1383. }
  1384. nodeOS := awsKey.Labels[nodeOSLabel]
  1385. nodeArch := awsKey.Labels[nodeArchLabel]
  1386. hourlyCPU, hourlyRAM, err := aws.FargatePricing.GetHourlyPricing(region, nodeOS, nodeArch)
  1387. if err != nil {
  1388. return nil, models.PricingMetadata{}, fmt.Errorf("could not get hourly pricing for fargate node %s: %v", awsKey.Name, err)
  1389. }
  1390. cost := hourlyCPU*vCPU + hourlyRAM*memory
  1391. return &models.Node{
  1392. Cost: strconv.FormatFloat(cost, 'f', -1, 64),
  1393. VCPU: strconv.FormatFloat(vCPU, 'f', -1, 64),
  1394. RAM: strconv.FormatFloat(memory, 'f', -1, 64),
  1395. RAMBytes: strconv.FormatFloat(memory*1024*1024*1024, 'f', -1, 64),
  1396. VCPUCost: strconv.FormatFloat(hourlyCPU, 'f', -1, 64),
  1397. RAMCost: strconv.FormatFloat(hourlyRAM, 'f', -1, 64),
  1398. BaseCPUPrice: aws.BaseCPUPrice,
  1399. BaseRAMPrice: aws.BaseRAMPrice,
  1400. BaseGPUPrice: aws.BaseGPUPrice,
  1401. UsageType: usageType,
  1402. }, models.PricingMetadata{}, nil
  1403. }
  1404. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  1405. func (aws *AWS) NodePricing(k models.Key) (*models.Node, models.PricingMetadata, error) {
  1406. aws.DownloadPricingDataLock.RLock()
  1407. defer aws.DownloadPricingDataLock.RUnlock()
  1408. key := k.Features()
  1409. usageType := "ondemand"
  1410. if aws.isPreemptible(key) {
  1411. usageType = PreemptibleType
  1412. }
  1413. meta := models.PricingMetadata{}
  1414. terms, ok := aws.Pricing[key]
  1415. if termsStr, err := json.Marshal(terms); err == nil {
  1416. log.Debugf("NodePricing: for key \"%s\" found the following OnDemand data: %s", key, string(termsStr))
  1417. }
  1418. if ok {
  1419. return aws.createNode(terms, usageType, k)
  1420. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  1421. aws.DownloadPricingDataLock.RUnlock()
  1422. err := aws.DownloadPricingData()
  1423. aws.DownloadPricingDataLock.RLock()
  1424. if err != nil {
  1425. return &models.Node{
  1426. Cost: aws.BaseCPUPrice,
  1427. BaseCPUPrice: aws.BaseCPUPrice,
  1428. BaseRAMPrice: aws.BaseRAMPrice,
  1429. BaseGPUPrice: aws.BaseGPUPrice,
  1430. UsageType: usageType,
  1431. UsesBaseCPUPrice: true,
  1432. }, meta, err
  1433. }
  1434. terms, termsOk := aws.Pricing[key]
  1435. if !termsOk {
  1436. return &models.Node{
  1437. Cost: aws.BaseCPUPrice,
  1438. BaseCPUPrice: aws.BaseCPUPrice,
  1439. BaseRAMPrice: aws.BaseRAMPrice,
  1440. BaseGPUPrice: aws.BaseGPUPrice,
  1441. UsageType: usageType,
  1442. UsesBaseCPUPrice: true,
  1443. }, meta, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  1444. }
  1445. return aws.createNode(terms, usageType, k)
  1446. } else if awsKey, ok := k.(*awsKey); ok && awsKey.isFargateNode() {
  1447. // Since Fargate pricing is listed at AmazonECS and is different from AmazonEC2, we handle it separately here
  1448. return aws.createFargateNode(awsKey, usageType)
  1449. } else { // Fall back to base pricing if we can't find the key. Base pricing is handled at the costmodel level.
  1450. // we seem to have an issue where this error gets thrown during app start.
  1451. // somehow the ValidPricingKeys map is being accessed before all the pricing data has been downloaded
  1452. return nil, meta, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
  1453. }
  1454. }
  1455. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  1456. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  1457. c, err := awsProvider.GetConfig()
  1458. if err != nil {
  1459. return nil, err
  1460. }
  1461. const defaultClusterName = "AWS Cluster #1"
  1462. // Determine cluster name
  1463. clusterName := c.ClusterName
  1464. if clusterName == "" {
  1465. awsClusterID := env.GetAWSClusterID()
  1466. if awsClusterID != "" {
  1467. log.Infof("Returning \"%s\" as ClusterName", awsClusterID)
  1468. clusterName = awsClusterID
  1469. log.Warnf("Warning - %s will be deprecated in a future release. Use %s instead", env.AWSClusterIDEnvVar, coreenv.ClusterIDEnvVar)
  1470. } else if clusterName = coreenv.GetClusterID(); clusterName != "" {
  1471. log.DedupedInfof(5, "Setting cluster name to %s from %s ", clusterName, coreenv.ClusterIDEnvVar)
  1472. } else {
  1473. clusterName = defaultClusterName
  1474. log.DedupedWarningf(5, "Unable to detect cluster name - using default of %s", defaultClusterName)
  1475. log.DedupedWarningf(5, "Please set cluster name through configmap or via %s env var", coreenv.ClusterIDEnvVar)
  1476. }
  1477. }
  1478. // this value requires configuration but is unavailable else where
  1479. clusterAccountID := c.ClusterAccountID
  1480. // Use AthenaProjectID if Cluster Account is not set to support older configs
  1481. if clusterAccountID == "" {
  1482. clusterAccountID = c.AthenaProjectID
  1483. }
  1484. m := make(map[string]string)
  1485. m["name"] = clusterName
  1486. m["provider"] = opencost.AWSProvider
  1487. m["account"] = clusterAccountID
  1488. m["region"] = awsProvider.ClusterRegion
  1489. m["id"] = coreenv.GetClusterID()
  1490. m["remoteReadEnabled"] = strconv.FormatBool(env.IsRemoteEnabled())
  1491. m["provisioner"] = awsProvider.clusterProvisioner
  1492. return m, nil
  1493. }
  1494. // updates the authentication to the latest values (via config or secret)
  1495. func (aws *AWS) ConfigureAuth() error {
  1496. c, err := aws.Config.GetCustomPricingData()
  1497. if err != nil {
  1498. log.Errorf("Error downloading default pricing data: %s", err.Error())
  1499. }
  1500. return aws.ConfigureAuthWith(c)
  1501. }
  1502. // updates the authentication to the latest values (via config or secret)
  1503. func (aws *AWS) ConfigureAuthWith(config *models.CustomPricing) error {
  1504. accessKeyID, accessKeySecret := aws.getAWSAuth(false, config)
  1505. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1506. err := coreenv.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
  1507. if err != nil {
  1508. return err
  1509. }
  1510. err = coreenv.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
  1511. if err != nil {
  1512. return err
  1513. }
  1514. }
  1515. return nil
  1516. }
  1517. // Gets the aws key id and secret
  1518. func (aws *AWS) getAWSAuth(forceReload bool, cp *models.CustomPricing) (string, string) {
  1519. // 1. Check config values first (set from frontend UI)
  1520. if cp.AwsServiceKeyName != "" && cp.AwsServiceKeySecret != "" {
  1521. aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
  1522. Message: "AWS ServiceKey exists",
  1523. Status: true,
  1524. })
  1525. return cp.AwsServiceKeyName, cp.AwsServiceKeySecret
  1526. }
  1527. // 2. Check for secret
  1528. s, _ := aws.loadAWSAuthSecret(forceReload)
  1529. if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
  1530. aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
  1531. Message: "AWS ServiceKey exists",
  1532. Status: true,
  1533. })
  1534. return s.AccessKeyID, s.SecretAccessKey
  1535. }
  1536. // 3. Fall back to env vars
  1537. if env.GetAWSAccessKeyID() == "" || env.GetAWSAccessKeySecret() == "" {
  1538. aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
  1539. Message: "AWS ServiceKey exists",
  1540. Status: false,
  1541. })
  1542. } else {
  1543. aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
  1544. Message: "AWS ServiceKey exists",
  1545. Status: true,
  1546. })
  1547. }
  1548. return env.GetAWSAccessKeyID(), env.GetAWSAccessKeySecret()
  1549. }
  1550. // Load once and cache the result (even on failure). This is an install time secret, so
  1551. // we don't expect the secret to change. If it does, however, we can force reload using
  1552. // the input parameter.
  1553. func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
  1554. if !force && loadedAWSSecret {
  1555. return awsSecret, nil
  1556. }
  1557. loadedAWSSecret = true
  1558. exists, err := fileutil.FileExists(models.AuthSecretPath)
  1559. if !exists || err != nil {
  1560. return nil, fmt.Errorf("Failed to locate service account file: %s", models.AuthSecretPath)
  1561. }
  1562. result, err := os.ReadFile(models.AuthSecretPath)
  1563. if err != nil {
  1564. return nil, err
  1565. }
  1566. var ak AWSAccessKey
  1567. err = json.Unmarshal(result, &ak)
  1568. if err != nil {
  1569. return nil, err
  1570. }
  1571. awsSecret = &ak
  1572. return awsSecret, nil
  1573. }
  1574. func (aws *AWS) getAddressesForRegion(ctx context.Context, region string) (*ec2.DescribeAddressesOutput, error) {
  1575. aak, err := aws.GetAWSAccessKey()
  1576. if err != nil {
  1577. return nil, err
  1578. }
  1579. cfg, err := aak.CreateConfig(region)
  1580. if err != nil {
  1581. return nil, err
  1582. }
  1583. cli := ec2.NewFromConfig(cfg)
  1584. return cli.DescribeAddresses(ctx, &ec2.DescribeAddressesInput{})
  1585. }
  1586. func (aws *AWS) getAllAddresses() ([]*ec2Types.Address, error) {
  1587. aws.ConfigureAuth() // load authentication data into env vars
  1588. regions := aws.Regions()
  1589. addressCh := make(chan *ec2.DescribeAddressesOutput, len(regions))
  1590. errorCh := make(chan error, len(regions))
  1591. var wg sync.WaitGroup
  1592. wg.Add(len(regions))
  1593. // Get volumes from each AWS region
  1594. for _, r := range regions {
  1595. region := r // make a copy of r to avoid capturing loop variable
  1596. // Fetch IP address response and send results and errors to their
  1597. // respective channels
  1598. go func() {
  1599. defer wg.Done()
  1600. defer errs.HandlePanic()
  1601. // Query for first page of volume results
  1602. resp, err := aws.getAddressesForRegion(context.TODO(), region)
  1603. if err != nil {
  1604. var awsErr smithy.APIError
  1605. if errors.As(err, &awsErr) {
  1606. switch awsErr.ErrorCode() {
  1607. case "AuthFailure", "InvalidClientTokenId", "UnauthorizedOperation":
  1608. log.DedupedInfof(5, "Unable to get addresses for region %s due to AWS permissions, error message: %s", region, awsErr.ErrorMessage())
  1609. return
  1610. default:
  1611. errorCh <- err
  1612. return
  1613. }
  1614. } else {
  1615. errorCh <- err
  1616. return
  1617. }
  1618. }
  1619. addressCh <- resp
  1620. }()
  1621. }
  1622. // Close the result channels after everything has been sent
  1623. go func() {
  1624. defer errs.HandlePanic()
  1625. wg.Wait()
  1626. close(errorCh)
  1627. close(addressCh)
  1628. }()
  1629. var addresses []*ec2Types.Address
  1630. for adds := range addressCh {
  1631. for _, add := range adds.Addresses {
  1632. a := add // duplicate to avoid pointer to iterator
  1633. addresses = append(addresses, &a)
  1634. }
  1635. }
  1636. var errs []error
  1637. for err := range errorCh {
  1638. log.DedupedWarningf(5, "unable to get addresses: %s", err)
  1639. errs = append(errs, err)
  1640. }
  1641. // Return error if no addresses are returned
  1642. if len(errs) > 0 && len(addresses) == 0 {
  1643. return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errs), errs)
  1644. }
  1645. return addresses, nil
  1646. }
  1647. // GetAddresses retrieves EC2 addresses
  1648. func (aws *AWS) GetAddresses() ([]byte, error) {
  1649. addresses, err := aws.getAllAddresses()
  1650. if err != nil {
  1651. return nil, err
  1652. }
  1653. // Format the response this way to match the JSON-encoded formatting of a single response
  1654. // from DescribeAddresss, so that consumers can always expect AWS disk responses to have
  1655. // a "Addresss" key at the top level.
  1656. return json.Marshal(map[string][]*ec2Types.Address{
  1657. "Addresses": addresses,
  1658. })
  1659. }
  1660. func (aws *AWS) isAddressOrphaned(address *ec2Types.Address) bool {
  1661. if address.AssociationId != nil {
  1662. return false
  1663. }
  1664. return true
  1665. }
  1666. func (aws *AWS) getDisksForRegion(ctx context.Context, region string, maxResults int32, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
  1667. aak, err := aws.GetAWSAccessKey()
  1668. if err != nil {
  1669. return nil, err
  1670. }
  1671. cfg, err := aak.CreateConfig(region)
  1672. if err != nil {
  1673. return nil, err
  1674. }
  1675. cli := ec2.NewFromConfig(cfg)
  1676. return cli.DescribeVolumes(ctx, &ec2.DescribeVolumesInput{
  1677. MaxResults: &maxResults,
  1678. NextToken: nextToken,
  1679. })
  1680. }
  1681. func (aws *AWS) getAllDisks() ([]*ec2Types.Volume, error) {
  1682. aws.ConfigureAuth() // load authentication data into env vars
  1683. regions := aws.Regions()
  1684. volumeCh := make(chan *ec2.DescribeVolumesOutput, len(regions))
  1685. errorCh := make(chan error, len(regions))
  1686. var wg sync.WaitGroup
  1687. wg.Add(len(regions))
  1688. // Get volumes from each AWS region
  1689. for _, r := range regions {
  1690. // Fetch volume response and send results and errors to their
  1691. // respective channels
  1692. go func(region string) {
  1693. defer wg.Done()
  1694. defer errs.HandlePanic()
  1695. // Query for first page of volume results
  1696. resp, err := aws.getDisksForRegion(context.TODO(), region, 1000, nil)
  1697. if err != nil {
  1698. var awsErr smithy.APIError
  1699. if errors.As(err, &awsErr) {
  1700. switch awsErr.ErrorCode() {
  1701. case "AuthFailure", "InvalidClientTokenId", "UnauthorizedOperation":
  1702. log.DedupedInfof(5, "Unable to get disks for region %s due to AWS permissions, error message: %s", region, awsErr.ErrorMessage())
  1703. return
  1704. default:
  1705. errorCh <- err
  1706. return
  1707. }
  1708. } else {
  1709. errorCh <- err
  1710. return
  1711. }
  1712. }
  1713. volumeCh <- resp
  1714. // A NextToken indicates more pages of results. Keep querying
  1715. // until all pages are retrieved.
  1716. for resp.NextToken != nil {
  1717. resp, err = aws.getDisksForRegion(context.TODO(), region, 100, resp.NextToken)
  1718. if err != nil {
  1719. errorCh <- err
  1720. return
  1721. }
  1722. volumeCh <- resp
  1723. }
  1724. }(r)
  1725. }
  1726. // Close the result channels after everything has been sent
  1727. go func() {
  1728. defer errs.HandlePanic()
  1729. wg.Wait()
  1730. close(errorCh)
  1731. close(volumeCh)
  1732. }()
  1733. var volumes []*ec2Types.Volume
  1734. for vols := range volumeCh {
  1735. for _, vol := range vols.Volumes {
  1736. v := vol // duplicate to avoid pointer to iterator
  1737. volumes = append(volumes, &v)
  1738. }
  1739. }
  1740. var errs []error
  1741. for err := range errorCh {
  1742. log.DedupedWarningf(5, "unable to get disks: %s", err)
  1743. errs = append(errs, err)
  1744. }
  1745. // Return error if no volumes are returned
  1746. if len(errs) > 0 && len(volumes) == 0 {
  1747. return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errs), errs)
  1748. }
  1749. return volumes, nil
  1750. }
  1751. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  1752. func (aws *AWS) GetDisks() ([]byte, error) {
  1753. volumes, err := aws.getAllDisks()
  1754. if err != nil {
  1755. return nil, err
  1756. }
  1757. // Format the response this way to match the JSON-encoded formatting of a single response
  1758. // from DescribeVolumes, so that consumers can always expect AWS disk responses to have
  1759. // a "Volumes" key at the top level.
  1760. return json.Marshal(map[string][]*ec2Types.Volume{
  1761. "Volumes": volumes,
  1762. })
  1763. }
  1764. func (aws *AWS) isDiskOrphaned(vol *ec2Types.Volume) bool {
  1765. // Do not consider volume orphaned if in use
  1766. if vol.State == InUseState {
  1767. return false
  1768. }
  1769. // Do not consider volume orphaned if volume is attached to any attachments
  1770. if len(vol.Attachments) != 0 {
  1771. for _, attachment := range vol.Attachments {
  1772. if attachment.State == AttachedState {
  1773. return false
  1774. }
  1775. }
  1776. }
  1777. return true
  1778. }
  1779. func (aws *AWS) GetOrphanedResources() ([]models.OrphanedResource, error) {
  1780. volumes, volumesErr := aws.getAllDisks()
  1781. addresses, addressesErr := aws.getAllAddresses()
  1782. // If we have any orphaned resources - prioritize returning them over returning errors
  1783. if len(addresses) == 0 && len(volumes) == 0 {
  1784. if volumesErr != nil {
  1785. return nil, volumesErr
  1786. }
  1787. if addressesErr != nil {
  1788. return nil, addressesErr
  1789. }
  1790. }
  1791. var orphanedResources []models.OrphanedResource
  1792. for _, volume := range volumes {
  1793. if aws.isDiskOrphaned(volume) {
  1794. cost, err := aws.findCostForDisk(volume)
  1795. if err != nil {
  1796. return nil, err
  1797. }
  1798. var volumeSize int64
  1799. if volume.Size != nil {
  1800. volumeSize = int64(*volume.Size)
  1801. }
  1802. // This is turning us-east-1a into us-east-1
  1803. var zone string
  1804. if volume.AvailabilityZone != nil {
  1805. zone = *volume.AvailabilityZone
  1806. }
  1807. var region, url string
  1808. region = regionRx.FindString(zone)
  1809. if region != "" {
  1810. url = "https://console.aws.amazon.com/ec2/home?region=" + region + "#Volumes:sort=desc:createTime"
  1811. } else {
  1812. url = "https://console.aws.amazon.com/ec2/home?#Volumes:sort=desc:createTime"
  1813. }
  1814. // output tags as desc
  1815. tags := map[string]string{}
  1816. for _, tag := range volume.Tags {
  1817. tags[*tag.Key] = *tag.Value
  1818. }
  1819. or := models.OrphanedResource{
  1820. Kind: "disk",
  1821. Region: zone,
  1822. Size: &volumeSize,
  1823. DiskName: *volume.VolumeId,
  1824. Url: url,
  1825. MonthlyCost: cost,
  1826. Description: tags,
  1827. }
  1828. orphanedResources = append(orphanedResources, or)
  1829. }
  1830. }
  1831. for _, address := range addresses {
  1832. if aws.isAddressOrphaned(address) {
  1833. cost := AWSHourlyPublicIPCost * timeutil.HoursPerMonth
  1834. desc := map[string]string{}
  1835. for _, tag := range address.Tags {
  1836. if tag.Key == nil {
  1837. continue
  1838. }
  1839. if tag.Value == nil {
  1840. desc[*tag.Key] = ""
  1841. } else {
  1842. desc[*tag.Key] = *tag.Value
  1843. }
  1844. }
  1845. or := models.OrphanedResource{
  1846. Kind: "address",
  1847. Address: *address.PublicIp,
  1848. Description: desc,
  1849. Url: "http://console.aws.amazon.com/ec2/home?#Addresses",
  1850. MonthlyCost: &cost,
  1851. }
  1852. orphanedResources = append(orphanedResources, or)
  1853. }
  1854. }
  1855. return orphanedResources, nil
  1856. }
  1857. func (aws *AWS) findCostForDisk(disk *ec2Types.Volume) (*float64, error) {
  1858. // todo: use AWS pricing from all regions
  1859. if disk.AvailabilityZone == nil {
  1860. return nil, fmt.Errorf("nil region")
  1861. }
  1862. if disk.Size == nil {
  1863. return nil, fmt.Errorf("nil disk size")
  1864. }
  1865. class := volTypes[string(disk.VolumeType)]
  1866. key := aws.ClusterRegion + "," + class
  1867. pricing, ok := aws.Pricing[key]
  1868. if !ok {
  1869. return nil, fmt.Errorf("no pricing data for key '%s'", key)
  1870. }
  1871. if pricing == nil {
  1872. return nil, fmt.Errorf("nil pricing data for key '%s'", key)
  1873. }
  1874. if pricing.PV == nil {
  1875. return nil, fmt.Errorf("pricing for key '%s' has nil PV", key)
  1876. }
  1877. priceStr := pricing.PV.Cost
  1878. price, err := strconv.ParseFloat(priceStr, 64)
  1879. if err != nil {
  1880. return nil, err
  1881. }
  1882. cost := price * timeutil.HoursPerMonth * float64(*disk.Size)
  1883. return &cost, nil
  1884. }
  1885. // QueryAthenaPaginated executes athena query and processes results.
  1886. func (aws *AWS) QueryAthenaPaginated(ctx context.Context, query string, fn func(*athena.GetQueryResultsOutput) bool) error {
  1887. awsAthenaInfo, err := aws.GetAWSAthenaInfo()
  1888. if err != nil {
  1889. return err
  1890. }
  1891. if awsAthenaInfo.AthenaDatabase == "" || awsAthenaInfo.AthenaTable == "" || awsAthenaInfo.AthenaRegion == "" ||
  1892. awsAthenaInfo.AthenaBucketName == "" || awsAthenaInfo.AccountID == "" {
  1893. return fmt.Errorf("QueryAthenaPaginated: athena configuration incomplete")
  1894. }
  1895. queryExecutionCtx := &athenaTypes.QueryExecutionContext{
  1896. Database: awsSDK.String(awsAthenaInfo.AthenaDatabase),
  1897. }
  1898. if awsAthenaInfo.AthenaCatalog != "" {
  1899. queryExecutionCtx.Catalog = awsSDK.String(awsAthenaInfo.AthenaCatalog)
  1900. }
  1901. resultConfiguration := &athenaTypes.ResultConfiguration{
  1902. OutputLocation: awsSDK.String(awsAthenaInfo.AthenaBucketName),
  1903. }
  1904. startQueryExecutionInput := &athena.StartQueryExecutionInput{
  1905. QueryString: awsSDK.String(query),
  1906. QueryExecutionContext: queryExecutionCtx,
  1907. ResultConfiguration: resultConfiguration,
  1908. }
  1909. // Only set if there is a value, the default input is nil which defaults to the 'primary' workgroup
  1910. if awsAthenaInfo.AthenaWorkgroup != "" {
  1911. startQueryExecutionInput.WorkGroup = awsSDK.String(awsAthenaInfo.AthenaWorkgroup)
  1912. }
  1913. // Create Athena Client
  1914. cfg, err := awsAthenaInfo.CreateConfig()
  1915. if err != nil {
  1916. log.Errorf("Could not retrieve Athena Configuration: %s", err.Error())
  1917. }
  1918. cli := athena.NewFromConfig(cfg)
  1919. // Query Athena
  1920. startQueryExecutionOutput, err := cli.StartQueryExecution(ctx, startQueryExecutionInput)
  1921. if err != nil {
  1922. return fmt.Errorf("QueryAthenaPaginated: start query error: %s", err.Error())
  1923. }
  1924. err = waitForQueryToComplete(ctx, cli, startQueryExecutionOutput.QueryExecutionId)
  1925. if err != nil {
  1926. return fmt.Errorf("QueryAthenaPaginated: query execution error: %s", err.Error())
  1927. }
  1928. queryResultsInput := &athena.GetQueryResultsInput{
  1929. QueryExecutionId: startQueryExecutionOutput.QueryExecutionId,
  1930. }
  1931. getQueryResultsPaginator := athena.NewGetQueryResultsPaginator(cli, queryResultsInput)
  1932. for getQueryResultsPaginator.HasMorePages() {
  1933. pg, err := getQueryResultsPaginator.NextPage(ctx)
  1934. if err != nil {
  1935. log.Errorf("QueryAthenaPaginated: NextPage error: %s", err.Error())
  1936. continue
  1937. }
  1938. fn(pg)
  1939. }
  1940. return nil
  1941. }
  1942. type SavingsPlanData struct {
  1943. ResourceID string
  1944. EffectiveCost float64
  1945. SavingsPlanARN string
  1946. MostRecentDate string
  1947. }
  1948. func (aws *AWS) GetSavingsPlanDataFromAthena() error {
  1949. cfg, err := aws.GetConfig()
  1950. if err != nil {
  1951. aws.RIPricingError = err
  1952. return err
  1953. }
  1954. if cfg.AthenaBucketName == "" {
  1955. err = ErrNoAthenaBucket
  1956. aws.RIPricingError = err
  1957. return err
  1958. }
  1959. if aws.SavingsPlanDataByInstanceID == nil {
  1960. aws.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData)
  1961. }
  1962. tNow := time.Now()
  1963. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1964. start := tOneDayAgo.Format("2006-01-02")
  1965. end := tNow.Format("2006-01-02")
  1966. // Use Savings Plan Effective Rate as an estimation for cost, assuming the 1h most recent period got a fully loaded savings plan.
  1967. //
  1968. q := `SELECT
  1969. line_item_usage_start_date,
  1970. savings_plan_savings_plan_a_r_n,
  1971. line_item_resource_id,
  1972. savings_plan_savings_plan_rate
  1973. FROM %s as cost_data
  1974. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1975. AND line_item_line_item_type = 'SavingsPlanCoveredUsage' ORDER BY
  1976. line_item_usage_start_date DESC`
  1977. page := 0
  1978. mostRecentDate := ""
  1979. processResults := func(op *athena.GetQueryResultsOutput) bool {
  1980. if op == nil {
  1981. log.Errorf("GetSavingsPlanDataFromAthena: Athena page is nil")
  1982. return false
  1983. } else if op.ResultSet == nil {
  1984. log.Errorf("GetSavingsPlanDataFromAthena: Athena page.ResultSet is nil")
  1985. return false
  1986. }
  1987. aws.SavingsPlanDataLock.Lock()
  1988. defer aws.SavingsPlanDataLock.Unlock()
  1989. if page == 0 {
  1990. aws.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData) // Clean out the old data and only report a savingsplan price if its in the most recent run.
  1991. }
  1992. iter := op.ResultSet.Rows
  1993. if page == 0 && len(iter) > 0 {
  1994. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1995. }
  1996. page++
  1997. for _, r := range iter {
  1998. d := *r.Data[0].VarCharValue
  1999. if mostRecentDate == "" {
  2000. mostRecentDate = d
  2001. } else if mostRecentDate != d { // Get all most recent assignments
  2002. break
  2003. }
  2004. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  2005. if err != nil {
  2006. log.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  2007. }
  2008. r := &SavingsPlanData{
  2009. ResourceID: *r.Data[2].VarCharValue,
  2010. EffectiveCost: cost,
  2011. SavingsPlanARN: *r.Data[1].VarCharValue,
  2012. MostRecentDate: d,
  2013. }
  2014. aws.SavingsPlanDataByInstanceID[r.ResourceID] = r
  2015. }
  2016. log.Debugf("Found %d savings plan applied instances", len(aws.SavingsPlanDataByInstanceID))
  2017. for k, r := range aws.SavingsPlanDataByInstanceID {
  2018. log.DedupedInfof(5, "Savings Plan Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  2019. }
  2020. return true
  2021. }
  2022. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  2023. log.Debugf("Running Query: %s", query)
  2024. err = aws.QueryAthenaPaginated(context.TODO(), query, processResults)
  2025. if err != nil {
  2026. aws.RIPricingError = err
  2027. return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
  2028. }
  2029. return nil
  2030. }
  2031. type RIData struct {
  2032. ResourceID string
  2033. EffectiveCost float64
  2034. ReservationARN string
  2035. MostRecentDate string
  2036. }
  2037. func (aws *AWS) GetReservationDataFromAthena() error {
  2038. cfg, err := aws.GetConfig()
  2039. if err != nil {
  2040. aws.RIPricingError = err
  2041. return err
  2042. }
  2043. if cfg.AthenaBucketName == "" {
  2044. err = ErrNoAthenaBucket
  2045. aws.RIPricingError = err
  2046. return err
  2047. }
  2048. // Query for all column names in advance in order to validate configured
  2049. // label columns
  2050. columns, _ := aws.fetchColumns()
  2051. if !columns["reservation_reservation_a_r_n"] || !columns["reservation_effective_cost"] {
  2052. err = fmt.Errorf("no reservation data available in Athena")
  2053. aws.RIPricingError = err
  2054. return err
  2055. }
  2056. if aws.RIPricingByInstanceID == nil {
  2057. aws.RIPricingByInstanceID = make(map[string]*RIData)
  2058. }
  2059. tNow := time.Now()
  2060. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  2061. start := tOneDayAgo.Format("2006-01-02")
  2062. end := tNow.Format("2006-01-02")
  2063. q := `SELECT
  2064. line_item_usage_start_date,
  2065. reservation_reservation_a_r_n,
  2066. line_item_resource_id,
  2067. reservation_effective_cost
  2068. FROM %s as cost_data
  2069. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  2070. AND reservation_reservation_a_r_n <> '' ORDER BY
  2071. line_item_usage_start_date DESC`
  2072. page := 0
  2073. mostRecentDate := ""
  2074. processResults := func(op *athena.GetQueryResultsOutput) bool {
  2075. if op == nil {
  2076. log.Errorf("GetReservationDataFromAthena: Athena page is nil")
  2077. return false
  2078. } else if op.ResultSet == nil {
  2079. log.Errorf("GetReservationDataFromAthena: Athena page.ResultSet is nil")
  2080. return false
  2081. }
  2082. aws.RIDataLock.Lock()
  2083. defer aws.RIDataLock.Unlock()
  2084. if page == 0 {
  2085. aws.RIPricingByInstanceID = make(map[string]*RIData) // Clean out the old data and only report a RI price if its in the most recent run.
  2086. }
  2087. iter := op.ResultSet.Rows
  2088. if page == 0 && len(iter) > 0 {
  2089. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  2090. }
  2091. page++
  2092. for _, r := range iter {
  2093. d := *r.Data[0].VarCharValue
  2094. if mostRecentDate == "" {
  2095. mostRecentDate = d
  2096. } else if mostRecentDate != d { // Get all most recent assignments
  2097. break
  2098. }
  2099. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  2100. if err != nil {
  2101. log.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  2102. }
  2103. r := &RIData{
  2104. ResourceID: *r.Data[2].VarCharValue,
  2105. EffectiveCost: cost,
  2106. ReservationARN: *r.Data[1].VarCharValue,
  2107. MostRecentDate: d,
  2108. }
  2109. aws.RIPricingByInstanceID[r.ResourceID] = r
  2110. }
  2111. log.Debugf("Found %d reserved instances", len(aws.RIPricingByInstanceID))
  2112. for k, r := range aws.RIPricingByInstanceID {
  2113. log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  2114. }
  2115. return true
  2116. }
  2117. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  2118. log.Debugf("Running Query: %s", query)
  2119. err = aws.QueryAthenaPaginated(context.TODO(), query, processResults)
  2120. if err != nil {
  2121. aws.RIPricingError = err
  2122. return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
  2123. }
  2124. aws.RIPricingError = nil
  2125. return nil
  2126. }
  2127. // fetchColumns returns a list of the names of all columns in the configured
  2128. // Athena tables
  2129. func (aws *AWS) fetchColumns() (map[string]bool, error) {
  2130. columnSet := map[string]bool{}
  2131. awsAthenaInfo, err := aws.GetAWSAthenaInfo()
  2132. if err != nil {
  2133. return nil, err
  2134. }
  2135. // This Query is supported by Athena tables and views
  2136. q := `SELECT column_name FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s'`
  2137. query := fmt.Sprintf(q, awsAthenaInfo.AthenaDatabase, awsAthenaInfo.AthenaTable)
  2138. pageNum := 0
  2139. athenaErr := aws.QueryAthenaPaginated(context.TODO(), query, func(page *athena.GetQueryResultsOutput) bool {
  2140. if page == nil {
  2141. log.Errorf("fetchColumns: Athena page is nil")
  2142. return false
  2143. } else if page.ResultSet == nil {
  2144. log.Errorf("fetchColumns: Athena page.ResultSet is nil")
  2145. return false
  2146. }
  2147. // remove header row 'column_name'
  2148. rows := page.ResultSet.Rows[1:]
  2149. for _, row := range rows {
  2150. columnSet[*row.Data[0].VarCharValue] = true
  2151. }
  2152. pageNum++
  2153. return true
  2154. })
  2155. if athenaErr != nil {
  2156. return columnSet, athenaErr
  2157. }
  2158. if len(columnSet) == 0 {
  2159. log.Infof("No columns retrieved from Athena")
  2160. }
  2161. return columnSet, nil
  2162. }
  2163. type spotInfo struct {
  2164. Timestamp string `csv:"Timestamp"`
  2165. UsageType string `csv:"UsageType"`
  2166. Operation string `csv:"Operation"`
  2167. InstanceID string `csv:"InstanceID"`
  2168. MyBidID string `csv:"MyBidID"`
  2169. MyMaxPrice string `csv:"MyMaxPrice"`
  2170. MarketPrice string `csv:"MarketPrice"`
  2171. Charge string `csv:"Charge"`
  2172. Version string `csv:"Version"`
  2173. }
  2174. func (aws *AWS) parseSpotData(bucket string, prefix string, projectID string, region string) (map[string]*spotInfo, error) {
  2175. aws.ConfigureAuth() // configure aws api authentication by setting env vars
  2176. s3Prefix := projectID
  2177. if len(prefix) != 0 {
  2178. s3Prefix = prefix + "/" + s3Prefix
  2179. }
  2180. aak, err := aws.GetAWSAccessKey()
  2181. if err != nil {
  2182. return nil, err
  2183. }
  2184. cfg, err := aak.CreateConfig(region)
  2185. if err != nil {
  2186. return nil, err
  2187. }
  2188. cli := s3.NewFromConfig(cfg)
  2189. downloader := manager.NewDownloader(cli)
  2190. tNow := time.Now()
  2191. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  2192. ls := &s3.ListObjectsInput{
  2193. Bucket: awsSDK.String(bucket),
  2194. Prefix: awsSDK.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  2195. }
  2196. ls2 := &s3.ListObjectsInput{
  2197. Bucket: awsSDK.String(bucket),
  2198. Prefix: awsSDK.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  2199. }
  2200. lso, err := cli.ListObjects(context.TODO(), ls)
  2201. if err != nil {
  2202. aws.ServiceAccountChecks.Set("bucketList", &models.ServiceAccountCheck{
  2203. Message: "Bucket List Permissions Available",
  2204. Status: false,
  2205. AdditionalInfo: err.Error(),
  2206. })
  2207. return nil, err
  2208. } else {
  2209. aws.ServiceAccountChecks.Set("bucketList", &models.ServiceAccountCheck{
  2210. Message: "Bucket List Permissions Available",
  2211. Status: true,
  2212. })
  2213. }
  2214. lsoLen := len(lso.Contents)
  2215. log.Debugf("Found %d spot data files from yesterday", lsoLen)
  2216. if lsoLen == 0 {
  2217. log.Debugf("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  2218. }
  2219. lso2, err := cli.ListObjects(context.TODO(), ls2)
  2220. if err != nil {
  2221. return nil, err
  2222. }
  2223. lso2Len := len(lso2.Contents)
  2224. log.Debugf("Found %d spot data files from today", lso2Len)
  2225. if lso2Len == 0 {
  2226. log.Debugf("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  2227. }
  2228. // TODO: Worth it to use LastModifiedDate to determine if we should reparse the spot data?
  2229. var keys []*string
  2230. for _, obj := range lso.Contents {
  2231. keys = append(keys, obj.Key)
  2232. }
  2233. for _, obj := range lso2.Contents {
  2234. keys = append(keys, obj.Key)
  2235. }
  2236. header, err := csvutil.Header(spotInfo{}, "csv")
  2237. if err != nil {
  2238. return nil, err
  2239. }
  2240. fieldsPerRecord := len(header)
  2241. spots := make(map[string]*spotInfo)
  2242. for _, key := range keys {
  2243. getObj := &s3.GetObjectInput{
  2244. Bucket: awsSDK.String(bucket),
  2245. Key: key,
  2246. }
  2247. buf := manager.NewWriteAtBuffer([]byte{})
  2248. _, err := downloader.Download(context.TODO(), buf, getObj)
  2249. if err != nil {
  2250. aws.ServiceAccountChecks.Set("objectList", &models.ServiceAccountCheck{
  2251. Message: "Object Get Permissions Available",
  2252. Status: false,
  2253. AdditionalInfo: err.Error(),
  2254. })
  2255. return nil, err
  2256. } else {
  2257. aws.ServiceAccountChecks.Set("objectList", &models.ServiceAccountCheck{
  2258. Message: "Object Get Permissions Available",
  2259. Status: true,
  2260. })
  2261. }
  2262. r := bytes.NewReader(buf.Bytes())
  2263. gr, err := gzip.NewReader(r)
  2264. if err != nil {
  2265. return nil, err
  2266. }
  2267. csvReader := csv.NewReader(gr)
  2268. csvReader.Comma = '\t'
  2269. csvReader.FieldsPerRecord = fieldsPerRecord
  2270. dec, err := csvutil.NewDecoder(csvReader, header...)
  2271. if err != nil {
  2272. return nil, err
  2273. }
  2274. var foundVersion string
  2275. for {
  2276. spot := spotInfo{}
  2277. err := dec.Decode(&spot)
  2278. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  2279. if err == io.EOF {
  2280. break
  2281. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  2282. rec := dec.Record()
  2283. // the first two "Record()" will be the comment lines
  2284. // and they show up as len() == 1
  2285. // the first of which is "#Version"
  2286. // the second of which is "#Fields: "
  2287. if len(rec) != 1 {
  2288. log.Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  2289. continue
  2290. }
  2291. if len(foundVersion) == 0 {
  2292. spotFeedVersion := rec[0]
  2293. log.Debugf("Spot feed version is \"%s\"", spotFeedVersion)
  2294. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  2295. if matches != nil {
  2296. foundVersion = matches[1]
  2297. if foundVersion != supportedSpotFeedVersion {
  2298. log.Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  2299. break
  2300. }
  2301. }
  2302. continue
  2303. } else if strings.Index(rec[0], "#") == 0 {
  2304. continue
  2305. } else {
  2306. log.Infof("skipping non-TSV line: %s", rec)
  2307. continue
  2308. }
  2309. } else if err != nil {
  2310. log.Warnf("Error during spot info decode: %+v", err)
  2311. continue
  2312. }
  2313. log.DedupedInfof(5, "Found spot info for: %s", spot.InstanceID)
  2314. spots[spot.InstanceID] = &spot
  2315. }
  2316. gr.Close()
  2317. }
  2318. return spots, nil
  2319. }
  2320. // ApplyReservedInstancePricing TODO
  2321. func (aws *AWS) ApplyReservedInstancePricing(nodes map[string]*models.Node) {
  2322. }
  2323. func (aws *AWS) ServiceAccountStatus() *models.ServiceAccountStatus {
  2324. return aws.ServiceAccountChecks.GetStatus()
  2325. }
  2326. func (aws *AWS) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  2327. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  2328. }
  2329. // Regions returns a predefined list of AWS regions
  2330. func (aws *AWS) Regions() []string {
  2331. regionOverrides := env.GetRegionOverrideList()
  2332. if len(regionOverrides) > 0 {
  2333. log.Debugf("Overriding AWS regions with configured region list: %+v", regionOverrides)
  2334. return regionOverrides
  2335. }
  2336. return awsRegions
  2337. }
  2338. // PricingSourceSummary returns the pricing source summary for the provider.
  2339. // The summary represents what was _parsed_ from the pricing source, not
  2340. // everything that was _available_ in the pricing source.
  2341. func (aws *AWS) PricingSourceSummary() interface{} {
  2342. // encode the pricing source summary as a JSON string
  2343. return aws.Pricing
  2344. }