provider.go 77 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536
  1. package aws
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "encoding/csv"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "net/http"
  11. "os"
  12. "regexp"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "github.com/aws/smithy-go"
  18. "github.com/opencost/opencost/pkg/cloud/models"
  19. "github.com/opencost/opencost/pkg/cloud/utils"
  20. "github.com/opencost/opencost/core/pkg/clustercache"
  21. coreenv "github.com/opencost/opencost/core/pkg/env"
  22. errs "github.com/opencost/opencost/core/pkg/errors"
  23. "github.com/opencost/opencost/core/pkg/log"
  24. "github.com/opencost/opencost/core/pkg/opencost"
  25. "github.com/opencost/opencost/core/pkg/util"
  26. "github.com/opencost/opencost/core/pkg/util/fileutil"
  27. "github.com/opencost/opencost/core/pkg/util/json"
  28. "github.com/opencost/opencost/core/pkg/util/timeutil"
  29. "github.com/opencost/opencost/pkg/env"
  30. awsSDK "github.com/aws/aws-sdk-go-v2/aws"
  31. "github.com/aws/aws-sdk-go-v2/config"
  32. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  33. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  34. "github.com/aws/aws-sdk-go-v2/service/athena"
  35. athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types"
  36. "github.com/aws/aws-sdk-go-v2/service/ec2"
  37. ec2Types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
  38. "github.com/aws/aws-sdk-go-v2/service/s3"
  39. "github.com/aws/aws-sdk-go-v2/service/sts"
  40. "github.com/jszwec/csvutil"
  41. )
  42. const (
  43. supportedSpotFeedVersion = "1"
  44. SpotInfoUpdateType = "spotinfo"
  45. AthenaInfoUpdateType = "athenainfo"
  46. PreemptibleType = "preemptible"
  47. APIPricingSource = "Public API"
  48. SpotPricingSource = "Spot Data Feed"
  49. ReservedInstancePricingSource = "Savings Plan, Reserved Instance, and Out-Of-Cluster"
  50. FargatePricingSource = "Fargate"
  51. InUseState = "in-use"
  52. AttachedState = "attached"
  53. AWSHourlyPublicIPCost = 0.005
  54. EKSCapacityTypeLabel = "eks.amazonaws.com/capacityType"
  55. EKSCapacitySpotTypeValue = "SPOT"
  56. // relevant to pricing url
  57. awsPricingBaseURL = "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/"
  58. awsChinaPricingBaseURL = "https://pricing.cn-north-1.amazonaws.com.cn/offers/v1.0/cn/"
  59. pricingCurrentPath = "/current/"
  60. pricingIndexFile = "index.json"
  61. chinaRegionPrefix = "cn-"
  62. )
  63. var (
  64. // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  65. provIdRx = regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  66. usageTypeRegx = regexp.MustCompile(".*(-|^)(EBS.+)")
  67. versionRx = regexp.MustCompile(`^#Version: (\\d+)\\.\\d+$`)
  68. regionRx = regexp.MustCompile("([a-z]+-[a-z]+-[0-9])")
  69. ErrNoAthenaBucket = errors.New("No Athena Bucket configured")
  70. // StorageClassProvisionerDefaults specifies the default storage class types depending upon the provisioner
  71. StorageClassProvisionerDefaults = map[string]string{
  72. "kubernetes.io/aws-ebs": "gp2",
  73. "ebs.csi.aws.com": "gp3",
  74. // TODO: add efs provisioner
  75. }
  76. )
  77. func (aws *AWS) PricingSourceStatus() map[string]*models.PricingSource {
  78. sources := make(map[string]*models.PricingSource)
  79. sps := &models.PricingSource{
  80. Name: SpotPricingSource,
  81. Enabled: true,
  82. }
  83. if !aws.SpotRefreshEnabled() {
  84. sps.Available = false
  85. sps.Error = "Spot instances not set up"
  86. sps.Enabled = false
  87. } else {
  88. sps.Error = ""
  89. if aws.SpotPricingError != nil {
  90. sps.Error = aws.SpotPricingError.Error()
  91. }
  92. if sps.Error != "" {
  93. sps.Available = false
  94. } else if len(aws.SpotPricingByInstanceID) > 0 {
  95. sps.Available = true
  96. } else {
  97. sps.Error = "No spot instances detected"
  98. }
  99. }
  100. sources[SpotPricingSource] = sps
  101. rps := &models.PricingSource{
  102. Name: ReservedInstancePricingSource,
  103. Enabled: true,
  104. }
  105. rps.Error = ""
  106. if aws.RIPricingError != nil {
  107. rps.Error = aws.RIPricingError.Error()
  108. }
  109. if rps.Error != "" {
  110. rps.Available = false
  111. } else {
  112. rps.Available = true
  113. }
  114. sources[ReservedInstancePricingSource] = rps
  115. fs := &models.PricingSource{
  116. Name: FargatePricingSource,
  117. Enabled: true,
  118. Available: true,
  119. }
  120. if aws.FargatePricingError != nil {
  121. fs.Error = aws.FargatePricingError.Error()
  122. fs.Available = false
  123. }
  124. sources[FargatePricingSource] = fs
  125. return sources
  126. }
  127. // SpotRefreshDuration represents how much time must pass before we refresh
  128. const SpotRefreshDuration = 15 * time.Minute
  129. var awsRegions = []string{
  130. "us-east-2",
  131. "us-east-1",
  132. "us-west-1",
  133. "us-west-2",
  134. "ap-east-1",
  135. "ap-south-1",
  136. "ap-northeast-3",
  137. "ap-northeast-2",
  138. "ap-southeast-1",
  139. "ap-southeast-2",
  140. "ap-northeast-1",
  141. "ap-southeast-3",
  142. "ca-central-1",
  143. "cn-north-1",
  144. "cn-northwest-1",
  145. "eu-central-1",
  146. "eu-west-1",
  147. "eu-west-2",
  148. "eu-west-3",
  149. "eu-north-1",
  150. "eu-south-1",
  151. "me-south-1",
  152. "sa-east-1",
  153. "af-south-1",
  154. "us-gov-east-1",
  155. "us-gov-west-1",
  156. "me-central-1",
  157. }
  158. // AWS represents an Amazon Provider
  159. type AWS struct {
  160. Pricing map[string]*AWSProductTerms
  161. SpotPricingByInstanceID map[string]*spotInfo
  162. SpotPricingUpdatedAt *time.Time
  163. SpotRefreshRunning bool
  164. SpotPricingLock sync.RWMutex
  165. SpotPricingError error
  166. RIPricingByInstanceID map[string]*RIData
  167. RIPricingError error
  168. RIDataRunning bool
  169. RIDataLock sync.RWMutex
  170. SavingsPlanDataByInstanceID map[string]*SavingsPlanData
  171. SavingsPlanDataRunning bool
  172. SavingsPlanDataLock sync.RWMutex
  173. FargatePricing *FargatePricing
  174. FargatePricingError error
  175. ValidPricingKeys map[string]bool
  176. Clientset clustercache.ClusterCache
  177. BaseCPUPrice string
  178. BaseRAMPrice string
  179. BaseGPUPrice string
  180. BaseSpotCPUPrice string
  181. BaseSpotRAMPrice string
  182. BaseSpotGPUPrice string
  183. SpotLabelName string
  184. SpotLabelValue string
  185. SpotDataRegion string
  186. SpotDataBucket string
  187. SpotDataPrefix string
  188. ProjectID string
  189. DownloadPricingDataLock sync.RWMutex
  190. Config models.ProviderConfig
  191. ServiceAccountChecks *models.ServiceAccountChecks
  192. clusterManagementPrice float64
  193. ClusterRegion string
  194. ClusterAccountID string
  195. clusterProvisioner string
  196. }
  197. // AWSAccessKey holds AWS credentials and fulfils the awsV2.CredentialsProvider interface
  198. // Deprecated: v1.104 Use AccessKey instead
  199. type AWSAccessKey struct {
  200. AccessKeyID string `json:"aws_access_key_id"`
  201. SecretAccessKey string `json:"aws_secret_access_key"`
  202. }
  203. // Retrieve returns a set of awsV2 credentials using the AWSAccessKey's key and secret.
  204. // This fulfils the awsV2.CredentialsProvider interface contract.
  205. func (accessKey AWSAccessKey) Retrieve(ctx context.Context) (awsSDK.Credentials, error) {
  206. return awsSDK.Credentials{
  207. AccessKeyID: accessKey.AccessKeyID,
  208. SecretAccessKey: accessKey.SecretAccessKey,
  209. }, nil
  210. }
  211. // CreateConfig creates an AWS SDK V2 Config for the credentials that it contains for the provided region
  212. func (accessKey AWSAccessKey) CreateConfig(region string) (awsSDK.Config, error) {
  213. var cfg awsSDK.Config
  214. var err error
  215. // If accessKey values have not been provided, attempt to load cfg from service key annotations
  216. if accessKey.AccessKeyID == "" && accessKey.SecretAccessKey == "" {
  217. cfg, err = config.LoadDefaultConfig(context.TODO(), config.WithRegion(region))
  218. if err != nil {
  219. return cfg, fmt.Errorf("failed to initialize AWS SDK config for region from annotation %s: %s", region, err)
  220. }
  221. } else {
  222. // The AWS SDK v2 requires an object fulfilling the CredentialsProvider interface, which cloud.AWSAccessKey does
  223. cfg, err = config.LoadDefaultConfig(context.TODO(), config.WithCredentialsProvider(accessKey), config.WithRegion(region))
  224. if err != nil {
  225. return cfg, fmt.Errorf("failed to initialize AWS SDK config for region %s: %s", region, err)
  226. }
  227. }
  228. return cfg, nil
  229. }
  230. // AWSProductTerms represents the full terms of the product
  231. type AWSProductTerms struct {
  232. Sku string `json:"sku"`
  233. OnDemand *PriceListEC2Term `json:"OnDemand"`
  234. Reserved *PriceListEC2Term `json:"Reserved"`
  235. Memory string `json:"memory"`
  236. Storage string `json:"storage"`
  237. VCpu string `json:"vcpu"`
  238. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  239. PV *models.PV `json:"pv"`
  240. LoadBalancer *models.LoadBalancer `json:"load_balancer"`
  241. }
  242. // volTypes are used to map between AWS UsageTypes and
  243. // EBS volume types, as they would appear in K8s storage class
  244. // name and the EC2 API.
  245. var volTypes = map[string]string{
  246. "EBS:VolumeUsage.gp2": "gp2",
  247. "EBS:VolumeUsage.gp3": "gp3",
  248. "EBS:VolumeUsage": "standard",
  249. "EBS:VolumeUsage.sc1": "sc1",
  250. "EBS:VolumeP-IOPS.piops": "io1",
  251. "EBS:VolumeUsage.st1": "st1",
  252. "EBS:VolumeUsage.piops": "io1",
  253. "EBS:VolumeUsage.io2": "io2",
  254. "gp2": "EBS:VolumeUsage.gp2",
  255. "gp3": "EBS:VolumeUsage.gp3",
  256. "standard": "EBS:VolumeUsage",
  257. "sc1": "EBS:VolumeUsage.sc1",
  258. "io1": "EBS:VolumeUsage.piops",
  259. "st1": "EBS:VolumeUsage.st1",
  260. "io2": "EBS:VolumeUsage.io2",
  261. }
  262. var (
  263. loadedAWSSecret bool = false
  264. awsSecret *AWSAccessKey = nil
  265. )
  266. // KubeAttrConversion maps the k8s labels for region to an AWS key
  267. func (aws *AWS) KubeAttrConversion(region, instanceType, operatingSystem string) string {
  268. operatingSystem = strings.ToLower(operatingSystem)
  269. return region + "," + instanceType + "," + operatingSystem
  270. }
  271. // AwsSpotFeedInfo contains configuration for spot feed integration
  272. type AwsSpotFeedInfo struct {
  273. BucketName string `json:"bucketName"`
  274. Prefix string `json:"prefix"`
  275. Region string `json:"region"`
  276. AccountID string `json:"projectID"`
  277. ServiceKeyName string `json:"serviceKeyName"`
  278. ServiceKeySecret string `json:"serviceKeySecret"`
  279. SpotLabel string `json:"spotLabel"`
  280. SpotLabelValue string `json:"spotLabelValue"`
  281. }
  282. // AwsAthenaInfo contains configuration for CUR integration
  283. // Deprecated: v1.104 Use AthenaConfiguration instead
  284. type AwsAthenaInfo struct {
  285. AthenaBucketName string `json:"athenaBucketName"`
  286. AthenaRegion string `json:"athenaRegion"`
  287. AthenaDatabase string `json:"athenaDatabase"`
  288. AthenaCatalog string `json:"athenaCatalog"`
  289. AthenaTable string `json:"athenaTable"`
  290. AthenaWorkgroup string `json:"athenaWorkgroup"`
  291. ServiceKeyName string `json:"serviceKeyName"`
  292. ServiceKeySecret string `json:"serviceKeySecret"`
  293. AccountID string `json:"projectID"`
  294. MasterPayerARN string `json:"masterPayerARN"`
  295. }
  296. // IsEmpty returns true if all fields in config are empty, false if not.
  297. func (aai *AwsAthenaInfo) IsEmpty() bool {
  298. return aai.AthenaBucketName == "" &&
  299. aai.AthenaRegion == "" &&
  300. aai.AthenaDatabase == "" &&
  301. aai.AthenaCatalog == "" &&
  302. aai.AthenaTable == "" &&
  303. aai.AthenaWorkgroup == "" &&
  304. aai.ServiceKeyName == "" &&
  305. aai.ServiceKeySecret == "" &&
  306. aai.AccountID == "" &&
  307. aai.MasterPayerARN == ""
  308. }
  309. // CreateConfig creates an AWS SDK V2 Config for the credentials that it contains
  310. func (aai *AwsAthenaInfo) CreateConfig() (awsSDK.Config, error) {
  311. keyProvider := AWSAccessKey{AccessKeyID: aai.ServiceKeyName, SecretAccessKey: aai.ServiceKeySecret}
  312. cfg, err := keyProvider.CreateConfig(aai.AthenaRegion)
  313. if err != nil {
  314. return cfg, err
  315. }
  316. if aai.MasterPayerARN != "" {
  317. // Create the credentials from AssumeRoleProvider to assume the role
  318. // referenced by the roleARN.
  319. stsSvc := sts.NewFromConfig(cfg)
  320. creds := stscreds.NewAssumeRoleProvider(stsSvc, aai.MasterPayerARN)
  321. cfg.Credentials = awsSDK.NewCredentialsCache(creds)
  322. }
  323. return cfg, nil
  324. }
  325. func (aws *AWS) GetManagementPlatform() (string, error) {
  326. nodes := aws.Clientset.GetAllNodes()
  327. if len(nodes) > 0 {
  328. n := nodes[0]
  329. version := n.Status.NodeInfo.KubeletVersion
  330. if strings.Contains(version, "eks") {
  331. return "eks", nil
  332. }
  333. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  334. return "kops", nil
  335. }
  336. }
  337. return "", nil
  338. }
  339. func (aws *AWS) GetConfig() (*models.CustomPricing, error) {
  340. c, err := aws.Config.GetCustomPricingData()
  341. if err != nil {
  342. return nil, err
  343. }
  344. if c.Discount == "" {
  345. c.Discount = "0%"
  346. }
  347. if c.NegotiatedDiscount == "" {
  348. c.NegotiatedDiscount = "0%"
  349. }
  350. return c, nil
  351. }
  352. // GetAWSAccessKey generate an AWSAccessKey object from the config
  353. func (aws *AWS) GetAWSAccessKey() (*AWSAccessKey, error) {
  354. config, err := aws.GetConfig()
  355. if err != nil {
  356. return nil, fmt.Errorf("could not retrieve AwsAthenaInfo %s", err)
  357. }
  358. err = aws.ConfigureAuthWith(config)
  359. if err != nil {
  360. return nil, fmt.Errorf("error configuring Cloud Provider %s", err)
  361. }
  362. // Look for service key values in env if not present in config
  363. if config.AwsServiceKeyName == "" {
  364. config.AwsServiceKeyName = env.GetAWSAccessKeyID()
  365. }
  366. if config.AwsServiceKeySecret == "" {
  367. config.AwsServiceKeySecret = env.GetAWSAccessKeySecret()
  368. }
  369. if config.AwsServiceKeyName == "" && config.AwsServiceKeySecret == "" {
  370. log.DedupedInfof(1, "missing service key values for AWS cloud integration attempting to use service account integration")
  371. }
  372. return &AWSAccessKey{AccessKeyID: config.AwsServiceKeyName, SecretAccessKey: config.AwsServiceKeySecret}, nil
  373. }
  374. // GetAWSAthenaInfo generate an AWSAthenaInfo object from the config
  375. func (aws *AWS) GetAWSAthenaInfo() (*AwsAthenaInfo, error) {
  376. config, err := aws.GetConfig()
  377. if err != nil {
  378. return nil, fmt.Errorf("could not retrieve AwsAthenaInfo %s", err)
  379. }
  380. aak, err := aws.GetAWSAccessKey()
  381. if err != nil {
  382. return nil, err
  383. }
  384. return &AwsAthenaInfo{
  385. AthenaBucketName: config.AthenaBucketName,
  386. AthenaRegion: config.AthenaRegion,
  387. AthenaDatabase: config.AthenaDatabase,
  388. AthenaCatalog: config.AthenaCatalog,
  389. AthenaTable: config.AthenaTable,
  390. AthenaWorkgroup: config.AthenaWorkgroup,
  391. ServiceKeyName: aak.AccessKeyID,
  392. ServiceKeySecret: aak.SecretAccessKey,
  393. AccountID: config.AthenaProjectID,
  394. MasterPayerARN: config.MasterPayerARN,
  395. }, nil
  396. }
  397. func (aws *AWS) UpdateConfigFromConfigMap(cm map[string]string) (*models.CustomPricing, error) {
  398. return aws.Config.UpdateFromMap(cm)
  399. }
  400. func configUpdaterWithReaderAndType(r io.Reader, updateType string) func(c *models.CustomPricing) error {
  401. return func(c *models.CustomPricing) error {
  402. switch updateType {
  403. case SpotInfoUpdateType:
  404. asfi := AwsSpotFeedInfo{}
  405. err := json.NewDecoder(r).Decode(&asfi)
  406. if err != nil {
  407. return err
  408. }
  409. c.AwsServiceKeyName = asfi.ServiceKeyName
  410. if asfi.ServiceKeySecret != "" {
  411. c.AwsServiceKeySecret = asfi.ServiceKeySecret
  412. }
  413. c.AwsSpotDataPrefix = asfi.Prefix
  414. c.AwsSpotDataBucket = asfi.BucketName
  415. c.ProjectID = asfi.AccountID
  416. c.AwsSpotDataRegion = asfi.Region
  417. c.SpotLabel = asfi.SpotLabel
  418. c.SpotLabelValue = asfi.SpotLabelValue
  419. case AthenaInfoUpdateType:
  420. aai := AwsAthenaInfo{}
  421. err := json.NewDecoder(r).Decode(&aai)
  422. if err != nil {
  423. return err
  424. }
  425. c.AthenaBucketName = aai.AthenaBucketName
  426. c.AthenaRegion = aai.AthenaRegion
  427. c.AthenaDatabase = aai.AthenaDatabase
  428. c.AthenaCatalog = aai.AthenaCatalog
  429. c.AthenaTable = aai.AthenaTable
  430. c.AthenaWorkgroup = aai.AthenaWorkgroup
  431. c.AwsServiceKeyName = aai.ServiceKeyName
  432. if aai.ServiceKeySecret != "" {
  433. c.AwsServiceKeySecret = aai.ServiceKeySecret
  434. }
  435. if aai.MasterPayerARN != "" {
  436. c.MasterPayerARN = aai.MasterPayerARN
  437. }
  438. c.AthenaProjectID = aai.AccountID
  439. default:
  440. a := make(map[string]any)
  441. err := json.NewDecoder(r).Decode(&a)
  442. if err != nil {
  443. return err
  444. }
  445. for k, v := range a {
  446. kUpper := utils.ToTitle.String(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  447. vstr, ok := v.(string)
  448. if ok {
  449. err := models.SetCustomPricingField(c, kUpper, vstr)
  450. if err != nil {
  451. return fmt.Errorf("error setting custom pricing field: %w", err)
  452. }
  453. } else {
  454. return fmt.Errorf("type error while updating config for %s", kUpper)
  455. }
  456. }
  457. }
  458. if env.IsRemoteEnabled() {
  459. err := utils.UpdateClusterMeta(coreenv.GetClusterID(), c.ClusterName)
  460. if err != nil {
  461. return err
  462. }
  463. }
  464. return nil
  465. }
  466. }
  467. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*models.CustomPricing, error) {
  468. return aws.Config.Update(configUpdaterWithReaderAndType(r, updateType))
  469. }
  470. type awsKey struct {
  471. Name string
  472. SpotLabelName string
  473. SpotLabelValue string
  474. Labels map[string]string
  475. ProviderID string
  476. }
  477. func (k *awsKey) GPUCount() int {
  478. return 0
  479. }
  480. func (k *awsKey) GPUType() string {
  481. return ""
  482. }
  483. func (k *awsKey) ID() string {
  484. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  485. if matchNum == 2 {
  486. return group
  487. }
  488. }
  489. log.Warnf("Could not find instance ID in \"%s\"", k.ProviderID)
  490. return ""
  491. }
  492. // Features will return a comma separated list of features for the given node
  493. // If the node has a spot label, it will be included in the list
  494. // Otherwise, the list include instance type, operating system, and the region
  495. func (k *awsKey) Features() string {
  496. instanceType, _ := util.GetInstanceType(k.Labels)
  497. operatingSystem, _ := util.GetOperatingSystem(k.Labels)
  498. region, _ := util.GetRegion(k.Labels)
  499. key := region + "," + instanceType + "," + operatingSystem
  500. usageType := k.getUsageType(k.Labels)
  501. spotKey := key + "," + usageType
  502. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  503. return spotKey
  504. }
  505. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  506. return spotKey
  507. }
  508. if usageType == PreemptibleType {
  509. return spotKey
  510. }
  511. return key
  512. }
  513. const eksComputeTypeLabel = "eks.amazonaws.com/compute-type"
  514. func (k *awsKey) isFargateNode() bool {
  515. v := k.Labels[eksComputeTypeLabel]
  516. if v == "fargate" {
  517. return true
  518. }
  519. return false
  520. }
  521. // getUsageType returns the usage type of the instance
  522. // If the instance is a spot instance, it will return PreemptibleType
  523. // Otherwise returns an empty string
  524. func (k *awsKey) getUsageType(labels map[string]string) string {
  525. if kLabel, ok := labels[k.SpotLabelName]; ok && kLabel == k.SpotLabelValue {
  526. return PreemptibleType
  527. }
  528. if eksLabel, ok := labels[EKSCapacityTypeLabel]; ok && eksLabel == EKSCapacitySpotTypeValue {
  529. // We currently write out spot instances as "preemptible" in the pricing data, so these need to match
  530. return PreemptibleType
  531. }
  532. if kLabel, ok := labels[models.KarpenterCapacityTypeLabel]; ok && kLabel == models.KarpenterCapacitySpotTypeValue {
  533. return PreemptibleType
  534. }
  535. return ""
  536. }
  537. func (awsProvider *AWS) GpuPricing(nodeLabels map[string]string) (string, error) {
  538. return "", nil
  539. }
  540. func (aws *AWS) PVPricing(pvk models.PVKey) (*models.PV, error) {
  541. pricing, ok := aws.Pricing[pvk.Features()]
  542. if !ok {
  543. log.Debugf("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  544. return &models.PV{}, nil
  545. }
  546. return pricing.PV, nil
  547. }
  548. type awsPVKey struct {
  549. Labels map[string]string
  550. StorageClassParameters map[string]string
  551. StorageClassName string
  552. Name string
  553. DefaultRegion string
  554. ProviderID string
  555. }
  556. func (aws *AWS) GetPVKey(pv *clustercache.PersistentVolume, parameters map[string]string, defaultRegion string) models.PVKey {
  557. providerID := ""
  558. if pv.Spec.AWSElasticBlockStore != nil {
  559. providerID = pv.Spec.AWSElasticBlockStore.VolumeID
  560. } else if pv.Spec.CSI != nil {
  561. providerID = pv.Spec.CSI.VolumeHandle
  562. }
  563. return &awsPVKey{
  564. Labels: pv.Labels,
  565. StorageClassName: pv.Spec.StorageClassName,
  566. StorageClassParameters: parameters,
  567. Name: pv.Name,
  568. DefaultRegion: defaultRegion,
  569. ProviderID: providerID,
  570. }
  571. }
  572. func (key *awsPVKey) ID() string {
  573. return key.ProviderID
  574. }
  575. func (key *awsPVKey) GetStorageClass() string {
  576. return key.StorageClassName
  577. }
  578. func (key *awsPVKey) Features() string {
  579. storageClass, ok := key.StorageClassParameters["type"]
  580. if !ok {
  581. log.Debugf("storage class %s doesn't have a 'type' parameter", key.Name)
  582. storageClass = getStorageClassTypeFrom(key.StorageClassParameters["provisioner"])
  583. }
  584. if storageClass == "standard" {
  585. storageClass = "gp2"
  586. }
  587. // Storage class names are generally EBS volume types (gp2)
  588. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  589. // Converts between the 2
  590. region, ok := util.GetRegion(key.Labels)
  591. if !ok {
  592. region = key.DefaultRegion
  593. }
  594. class, ok := volTypes[storageClass]
  595. if !ok {
  596. log.Debugf("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  597. }
  598. return region + "," + class
  599. }
  600. // getStorageClassTypeFrom returns the default ebs volume type for a provider provisioner
  601. func getStorageClassTypeFrom(provisioner string) string {
  602. // if there isn't any provided provisioner, return empty volume type
  603. if provisioner == "" {
  604. return ""
  605. }
  606. scType, ok := StorageClassProvisionerDefaults[provisioner]
  607. if ok {
  608. log.Debugf("using default voltype %s for provisioner %s", scType, provisioner)
  609. return scType
  610. }
  611. return ""
  612. }
  613. // GetKey maps node labels to information needed to retrieve pricing data
  614. func (aws *AWS) GetKey(labels map[string]string, n *clustercache.Node) models.Key {
  615. return &awsKey{
  616. Name: n.Name,
  617. SpotLabelName: aws.SpotLabelName,
  618. SpotLabelValue: aws.SpotLabelValue,
  619. Labels: labels,
  620. ProviderID: labels["providerID"],
  621. }
  622. }
  623. func (aws *AWS) isPreemptible(key string) bool {
  624. s := strings.Split(key, ",")
  625. if len(s) == 4 && s[3] == PreemptibleType {
  626. return true
  627. }
  628. return false
  629. }
  630. func (aws *AWS) ClusterManagementPricing() (string, float64, error) {
  631. return aws.clusterProvisioner, aws.clusterManagementPrice, nil
  632. }
  633. func getPricingListURL(serviceCode string, nodeList []*clustercache.Node) string {
  634. // See https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/using-the-aws-price-list-bulk-api-fetching-price-list-files-manually.html
  635. region := ""
  636. multiregion := false
  637. isChina := false
  638. for _, n := range nodeList {
  639. r, ok := util.GetRegion(n.Labels)
  640. if !ok {
  641. multiregion = true
  642. break
  643. }
  644. if strings.HasPrefix(r, chinaRegionPrefix) {
  645. isChina = true
  646. }
  647. if region == "" {
  648. region = r
  649. } else if r != region {
  650. multiregion = true
  651. break
  652. }
  653. }
  654. baseURL := awsPricingBaseURL + serviceCode + pricingCurrentPath
  655. if isChina {
  656. // Chinese regions are isolated and use a different pricing endpoint
  657. baseURL = awsChinaPricingBaseURL + serviceCode + pricingCurrentPath
  658. }
  659. if region != "" && !multiregion {
  660. baseURL += region + "/"
  661. }
  662. return baseURL + pricingIndexFile
  663. }
  664. // Use the pricing data from the current region. Fall back to using all region data if needed.
  665. func (aws *AWS) getRegionPricing(nodeList []*clustercache.Node) (*http.Response, string, error) {
  666. var pricingURL string
  667. if env.GetAWSPricingURL() != "" { // Allow override of pricing URL
  668. pricingURL = env.GetAWSPricingURL()
  669. } else {
  670. pricingURL = getPricingListURL("AmazonEC2", nodeList)
  671. }
  672. log.Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  673. resp, err := http.Get(pricingURL)
  674. if err != nil {
  675. log.Errorf("Bogus fetch of \"%s\": %v", pricingURL, err)
  676. return nil, pricingURL, err
  677. }
  678. return resp, pricingURL, err
  679. }
  680. // SpotRefreshEnabled determines whether the required configs to run the spot feed query have been set up
  681. func (aws *AWS) SpotRefreshEnabled() bool {
  682. // Guard against nil receiver
  683. if aws == nil {
  684. return false
  685. }
  686. // Fallback if config is not initialized
  687. if aws.Config == nil {
  688. return len(aws.SpotDataBucket) != 0 ||
  689. len(aws.SpotDataRegion) != 0 ||
  690. len(aws.ProjectID) != 0
  691. }
  692. // Check if spot data feed is explicitly disabled via config
  693. c, err := aws.Config.GetCustomPricingData()
  694. if err == nil && c.SpotDataFeedEnabled == "false" {
  695. return false
  696. }
  697. // Default behavior
  698. return len(aws.SpotDataBucket) != 0 ||
  699. len(aws.SpotDataRegion) != 0 ||
  700. len(aws.ProjectID) != 0
  701. }
  702. // DownloadPricingData fetches data from the AWS Pricing API
  703. func (aws *AWS) DownloadPricingData() error {
  704. aws.DownloadPricingDataLock.Lock()
  705. defer aws.DownloadPricingDataLock.Unlock()
  706. c, err := aws.Config.GetCustomPricingData()
  707. if err != nil {
  708. log.Errorf("Error downloading default pricing data: %s", err.Error())
  709. }
  710. aws.BaseCPUPrice = c.CPU
  711. aws.BaseRAMPrice = c.RAM
  712. aws.BaseGPUPrice = c.GPU
  713. aws.BaseSpotCPUPrice = c.SpotCPU
  714. aws.BaseSpotRAMPrice = c.SpotRAM
  715. aws.BaseSpotGPUPrice = c.SpotGPU
  716. aws.SpotLabelName = c.SpotLabel
  717. aws.SpotLabelValue = c.SpotLabelValue
  718. aws.SpotDataBucket = c.AwsSpotDataBucket
  719. aws.SpotDataPrefix = c.AwsSpotDataPrefix
  720. aws.ProjectID = c.ProjectID
  721. aws.SpotDataRegion = c.AwsSpotDataRegion
  722. aws.ConfigureAuthWith(c) // load aws authentication from configuration or secret
  723. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  724. log.Warnf("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  725. }
  726. nodeList := aws.Clientset.GetAllNodes()
  727. inputkeys := make(map[string]bool)
  728. for _, n := range nodeList {
  729. if _, ok := n.Labels["eks.amazonaws.com/nodegroup"]; ok {
  730. aws.clusterManagementPrice = 0.10
  731. aws.clusterProvisioner = "EKS"
  732. } else if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  733. aws.clusterProvisioner = "KOPS"
  734. }
  735. labels := n.Labels
  736. key := aws.GetKey(labels, n)
  737. inputkeys[key.Features()] = true
  738. }
  739. pvList := aws.Clientset.GetAllPersistentVolumes()
  740. storageClasses := aws.Clientset.GetAllStorageClasses()
  741. storageClassMap := make(map[string]map[string]string)
  742. for _, storageClass := range storageClasses {
  743. params := storageClass.Parameters
  744. if params != nil {
  745. params["provisioner"] = storageClass.Provisioner
  746. }
  747. storageClassMap[storageClass.Name] = params
  748. if storageClass.Annotations["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.Annotations["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  749. storageClassMap["default"] = params
  750. storageClassMap[""] = params
  751. }
  752. }
  753. pvkeys := make(map[string]models.PVKey)
  754. for _, pv := range pvList {
  755. params, ok := storageClassMap[pv.Spec.StorageClassName]
  756. if !ok {
  757. log.Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  758. continue
  759. }
  760. key := aws.GetPVKey(pv, params, "")
  761. pvkeys[key.Features()] = key
  762. }
  763. // RIDataRunning establishes the existence of the goroutine. Since it's possible we
  764. // run multiple downloads, we don't want to create multiple go routines if one already exists
  765. //
  766. // If athenaBucketName is unconfigured, the ReservedInstanceData and SavingsPlanData watchers
  767. // are skipped. Note: These watchers are less commonly used. It is recommended to use the full
  768. // CloudCosts feature via athenaintegration.go.
  769. if !aws.RIDataRunning {
  770. err = aws.GetReservationDataFromAthena() // Block until one run has completed.
  771. if err != nil {
  772. if errors.Is(err, ErrNoAthenaBucket) {
  773. log.Debugf("No \"athenaBucketName\" configured, ReservedInstanceData watcher will not run")
  774. } else {
  775. log.Warnf("Failed to lookup reserved instance data: %s", err.Error())
  776. }
  777. } else { // If we make one successful run, check on new reservation data every hour
  778. go func() {
  779. defer errs.HandlePanic()
  780. aws.RIDataRunning = true
  781. for {
  782. log.Infof("Reserved Instance watcher running... next update in 1h")
  783. time.Sleep(time.Hour)
  784. err := aws.GetReservationDataFromAthena()
  785. if err != nil {
  786. log.Infof("Error updating RI data: %s", err.Error())
  787. }
  788. }
  789. }()
  790. }
  791. }
  792. if !aws.SavingsPlanDataRunning {
  793. err = aws.GetSavingsPlanDataFromAthena()
  794. if err != nil {
  795. if errors.Is(err, ErrNoAthenaBucket) {
  796. log.Debugf("No \"athenaBucketName\" configured, SavingsPlanData watcher will not run")
  797. } else {
  798. log.Errorf("Failed to lookup savings plan data: %s", err.Error())
  799. }
  800. } else {
  801. go func() {
  802. defer errs.HandlePanic()
  803. aws.SavingsPlanDataRunning = true
  804. for {
  805. log.Infof("Savings Plan watcher running... next update in 1h")
  806. time.Sleep(time.Hour)
  807. err := aws.GetSavingsPlanDataFromAthena()
  808. if err != nil {
  809. log.Infof("Error updating Savings Plan data: %s", err.Error())
  810. }
  811. }
  812. }()
  813. }
  814. }
  815. // Initialize fargate pricing if it's not initialized yet
  816. if aws.FargatePricing == nil {
  817. aws.FargatePricing = NewFargatePricing()
  818. aws.FargatePricingError = aws.FargatePricing.Initialize(nodeList)
  819. if aws.FargatePricingError != nil {
  820. log.Errorf("Failed to initialize fargate pricing: %s", aws.FargatePricingError.Error())
  821. }
  822. }
  823. aws.ValidPricingKeys = make(map[string]bool)
  824. resp, pricingURL, err := aws.getRegionPricing(nodeList)
  825. if err != nil {
  826. return err
  827. }
  828. err = aws.populatePricing(resp, inputkeys)
  829. if err != nil {
  830. return err
  831. }
  832. log.Infof("Finished downloading \"%s\"", pricingURL)
  833. if !aws.SpotRefreshEnabled() {
  834. return nil
  835. }
  836. // Always run spot pricing refresh when performing download
  837. aws.refreshSpotPricing(true)
  838. // Only start a single refresh goroutine
  839. if !aws.SpotRefreshRunning {
  840. aws.SpotRefreshRunning = true
  841. go func() {
  842. defer errs.HandlePanic()
  843. for {
  844. log.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
  845. time.Sleep(SpotRefreshDuration)
  846. // Reoccurring refresh checks update times
  847. aws.refreshSpotPricing(false)
  848. }
  849. }()
  850. }
  851. return nil
  852. }
  853. func (aws *AWS) populatePricing(resp *http.Response, inputkeys map[string]bool) error {
  854. aws.Pricing = make(map[string]*AWSProductTerms)
  855. skusToKeys := make(map[string]string)
  856. dec := json.NewDecoder(resp.Body)
  857. for {
  858. t, err := dec.Token()
  859. if err == io.EOF {
  860. log.Infof("done loading \"%s\"\n", resp.Request.URL.String())
  861. break
  862. } else if err != nil {
  863. log.Errorf("error parsing response json %v", resp.Body)
  864. break
  865. }
  866. if t == "products" {
  867. _, err := dec.Token() // this should parse the opening "{""
  868. if err != nil {
  869. return err
  870. }
  871. for dec.More() {
  872. _, err := dec.Token() // the sku token
  873. if err != nil {
  874. return err
  875. }
  876. product := &PriceListEC2Product{}
  877. err = dec.Decode(&product)
  878. if err != nil {
  879. log.Errorf("Error parsing response from \"%s\": %v", resp.Request.URL.String(), err.Error())
  880. break
  881. }
  882. if product.Attributes.PreInstalledSw == "NA" &&
  883. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) &&
  884. product.Attributes.CapacityStatus == "Used" &&
  885. product.Attributes.MarketOption == "OnDemand" {
  886. key := aws.KubeAttrConversion(product.Attributes.RegionCode, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  887. spotKey := key + ",preemptible"
  888. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  889. productTerms := &AWSProductTerms{
  890. Sku: product.Sku,
  891. Memory: product.Attributes.Memory,
  892. Storage: product.Attributes.Storage,
  893. VCpu: product.Attributes.VCpu,
  894. GPU: product.Attributes.GPU,
  895. }
  896. aws.Pricing[key] = productTerms
  897. aws.Pricing[spotKey] = productTerms
  898. skusToKeys[product.Sku] = key
  899. }
  900. aws.ValidPricingKeys[key] = true
  901. aws.ValidPricingKeys[spotKey] = true
  902. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  903. // UsageTypes may be prefixed with a region code - we're removing this when using
  904. // volTypes to keep lookups generic
  905. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  906. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  907. key := product.Attributes.RegionCode + "," + usageTypeNoRegion
  908. spotKey := key + ",preemptible"
  909. pv := &models.PV{
  910. Class: volTypes[usageTypeNoRegion],
  911. Region: product.Attributes.RegionCode,
  912. }
  913. productTerms := &AWSProductTerms{
  914. Sku: product.Sku,
  915. PV: pv,
  916. }
  917. aws.Pricing[key] = productTerms
  918. aws.Pricing[spotKey] = productTerms
  919. skusToKeys[product.Sku] = key
  920. aws.ValidPricingKeys[key] = true
  921. aws.ValidPricingKeys[spotKey] = true
  922. } else if strings.Contains(product.Attributes.UsageType, "LoadBalancerUsage") && product.Attributes.Operation == "LoadBalancing:Network" {
  923. // since the costmodel is only using services of type LoadBalancer
  924. // (and not ingresses controlled by AWS load balancer controller)
  925. // we can safely filter for Network load balancers only
  926. productTerms := &AWSProductTerms{
  927. Sku: product.Sku,
  928. LoadBalancer: &models.LoadBalancer{},
  929. }
  930. // there is no spot pricing for load balancers
  931. key := product.Attributes.RegionCode + ",LoadBalancerUsage"
  932. aws.Pricing[key] = productTerms
  933. skusToKeys[product.Sku] = key
  934. aws.ValidPricingKeys[key] = true
  935. }
  936. }
  937. }
  938. if t == "terms" {
  939. _, err := dec.Token() // this should parse the opening "{""
  940. if err != nil {
  941. return err
  942. }
  943. termType, err := dec.Token()
  944. if err != nil {
  945. return err
  946. }
  947. if termType == "OnDemand" {
  948. _, err := dec.Token()
  949. if err != nil { // again, should parse an opening "{"
  950. return err
  951. }
  952. for dec.More() {
  953. sku, err := dec.Token()
  954. if err != nil {
  955. return err
  956. }
  957. _, err = dec.Token() // another opening "{"
  958. if err != nil {
  959. return err
  960. }
  961. // SKUOndemand
  962. _, err = dec.Token()
  963. if err != nil {
  964. return err
  965. }
  966. offerTerm := &PriceListEC2Term{}
  967. err = dec.Decode(&offerTerm)
  968. if err != nil {
  969. log.Errorf("Error decoding AWS Offer Term: %s", err.Error())
  970. }
  971. key, ok := skusToKeys[sku.(string)]
  972. spotKey := key + ",preemptible"
  973. if ok {
  974. aws.Pricing[key].OnDemand = offerTerm
  975. if _, ok := aws.Pricing[spotKey]; ok {
  976. aws.Pricing[spotKey].OnDemand = offerTerm
  977. }
  978. var cost string
  979. if _, isMatch := OnDemandRateCodes[offerTerm.OfferTermCode]; isMatch {
  980. priceDimensionKey := strings.Join([]string{sku.(string), offerTerm.OfferTermCode, HourlyRateCode}, ".")
  981. dimension, ok := offerTerm.PriceDimensions[priceDimensionKey]
  982. if ok {
  983. cost = dimension.PricePerUnit.USD
  984. } else {
  985. // this is an edge case seen in AWS CN pricing files, including here just in case
  986. // if there is only one dimension, use it, even if the key is incorrect, otherwise assume defaults
  987. if len(offerTerm.PriceDimensions) == 1 {
  988. for key, backupDimension := range offerTerm.PriceDimensions {
  989. cost = backupDimension.PricePerUnit.USD
  990. log.DedupedWarningf(5, "using:%s for a price dimension instead of missing dimension: %s", offerTerm.PriceDimensions[key], priceDimensionKey)
  991. break
  992. }
  993. } else if len(offerTerm.PriceDimensions) == 0 {
  994. log.DedupedWarningf(5, "populatePricing: no pricing dimension available for: %s.", priceDimensionKey)
  995. } else {
  996. log.DedupedWarningf(5, "populatePricing: no assumable pricing dimension available for: %s.", priceDimensionKey)
  997. }
  998. }
  999. } else if _, isMatch := OnDemandRateCodesCn[offerTerm.OfferTermCode]; isMatch {
  1000. priceDimensionKey := strings.Join([]string{sku.(string), offerTerm.OfferTermCode, HourlyRateCodeCn}, ".")
  1001. dimension, ok := offerTerm.PriceDimensions[priceDimensionKey]
  1002. if ok {
  1003. cost = dimension.PricePerUnit.CNY
  1004. } else {
  1005. // fall through logic for handling inconsistencies in AWS CN pricing files
  1006. // if there is only one dimension, use it, even if the key is incorrect, otherwise assume defaults
  1007. if len(offerTerm.PriceDimensions) == 1 {
  1008. for key, backupDimension := range offerTerm.PriceDimensions {
  1009. cost = backupDimension.PricePerUnit.CNY
  1010. log.DedupedWarningf(5, "using:%s for a price dimension instead of missing dimension: %s", offerTerm.PriceDimensions[key], priceDimensionKey)
  1011. break
  1012. }
  1013. } else if len(offerTerm.PriceDimensions) == 0 {
  1014. log.DedupedWarningf(5, "populatePricing: no pricing dimension available for: %s.", priceDimensionKey)
  1015. } else {
  1016. log.DedupedWarningf(5, "populatePricing: no assumable pricing dimension available for: %s.", priceDimensionKey)
  1017. }
  1018. }
  1019. }
  1020. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  1021. // If the specific UsageType is the per IO cost used on io1 volumes
  1022. // we need to add the per IO cost to the io1 PV cost
  1023. // Add the per IO cost to the PV object for the io1 volume type
  1024. aws.Pricing[key].PV.CostPerIO = cost
  1025. } else if strings.Contains(key, "EBS:Volume") {
  1026. // If volume, we need to get hourly cost and add it to the PV object
  1027. costFloat, _ := strconv.ParseFloat(cost, 64)
  1028. hourlyPrice := costFloat / 730
  1029. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  1030. } else if strings.Contains(key, "LoadBalancerUsage") {
  1031. costFloat, err := strconv.ParseFloat(cost, 64)
  1032. if err != nil {
  1033. return err
  1034. }
  1035. aws.Pricing[key].LoadBalancer.Cost = costFloat
  1036. }
  1037. }
  1038. _, err = dec.Token()
  1039. if err != nil {
  1040. return err
  1041. }
  1042. }
  1043. _, err = dec.Token()
  1044. if err != nil {
  1045. return err
  1046. }
  1047. }
  1048. }
  1049. }
  1050. return nil
  1051. }
  1052. func (aws *AWS) refreshSpotPricing(force bool) {
  1053. aws.SpotPricingLock.Lock()
  1054. defer aws.SpotPricingLock.Unlock()
  1055. now := time.Now().UTC()
  1056. updateTime := now.Add(-SpotRefreshDuration)
  1057. // Return if there was an update time set and an hour hasn't elapsed
  1058. if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
  1059. return
  1060. }
  1061. sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion)
  1062. if err != nil {
  1063. log.Warnf("Skipping AWS spot data download: %s", err.Error())
  1064. aws.SpotPricingError = err
  1065. return
  1066. }
  1067. aws.SpotPricingError = nil
  1068. // update time last updated
  1069. aws.SpotPricingUpdatedAt = &now
  1070. aws.SpotPricingByInstanceID = sp
  1071. }
  1072. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  1073. func (aws *AWS) NetworkPricing() (*models.Network, error) {
  1074. cpricing, err := aws.Config.GetCustomPricingData()
  1075. if err != nil {
  1076. return nil, err
  1077. }
  1078. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  1079. if err != nil {
  1080. return nil, err
  1081. }
  1082. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  1083. if err != nil {
  1084. return nil, err
  1085. }
  1086. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  1087. if err != nil {
  1088. return nil, err
  1089. }
  1090. nge, err := strconv.ParseFloat(cpricing.NatGatewayEgress, 64)
  1091. if err != nil {
  1092. return nil, err
  1093. }
  1094. ngi, err := strconv.ParseFloat(cpricing.NatGatewayIngress, 64)
  1095. if err != nil {
  1096. return nil, err
  1097. }
  1098. return &models.Network{
  1099. ZoneNetworkEgressCost: znec,
  1100. RegionNetworkEgressCost: rnec,
  1101. InternetNetworkEgressCost: inec,
  1102. NatGatewayEgressCost: nge,
  1103. NatGatewayIngressCost: ngi,
  1104. }, nil
  1105. }
  1106. func (aws *AWS) LoadBalancerPricing() (*models.LoadBalancer, error) {
  1107. // TODO: determine key based on function arguments
  1108. // this is something that should be changed in the Provider interface
  1109. key := aws.ClusterRegion + ",LoadBalancerUsage"
  1110. // set default price
  1111. hourlyCost := 0.025
  1112. // use price index when available
  1113. if terms, ok := aws.Pricing[key]; ok {
  1114. hourlyCost = terms.LoadBalancer.Cost
  1115. }
  1116. return &models.LoadBalancer{
  1117. Cost: hourlyCost,
  1118. }, nil
  1119. }
  1120. // AllNodePricing returns all the billing data fetched.
  1121. func (aws *AWS) AllNodePricing() (interface{}, error) {
  1122. aws.DownloadPricingDataLock.RLock()
  1123. defer aws.DownloadPricingDataLock.RUnlock()
  1124. return aws.Pricing, nil
  1125. }
  1126. func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
  1127. aws.SpotPricingLock.RLock()
  1128. defer aws.SpotPricingLock.RUnlock()
  1129. info, ok := aws.SpotPricingByInstanceID[instanceID]
  1130. return info, ok
  1131. }
  1132. func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
  1133. aws.RIDataLock.RLock()
  1134. defer aws.RIDataLock.RUnlock()
  1135. data, ok := aws.RIPricingByInstanceID[instanceID]
  1136. return data, ok
  1137. }
  1138. func (aws *AWS) savingsPlanPricing(instanceID string) (*SavingsPlanData, bool) {
  1139. aws.SavingsPlanDataLock.RLock()
  1140. defer aws.SavingsPlanDataLock.RUnlock()
  1141. data, ok := aws.SavingsPlanDataByInstanceID[instanceID]
  1142. return data, ok
  1143. }
  1144. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k models.Key) (*models.Node, models.PricingMetadata, error) {
  1145. key := k.Features()
  1146. meta := models.PricingMetadata{}
  1147. var cost string
  1148. publicPricingFound := true
  1149. c, ok := terms.OnDemand.PriceDimensions[strings.Join([]string{terms.Sku, terms.OnDemand.OfferTermCode, HourlyRateCode}, ".")]
  1150. if ok {
  1151. cost = c.PricePerUnit.USD
  1152. } else {
  1153. // Check for Chinese pricing
  1154. c, ok = terms.OnDemand.PriceDimensions[strings.Join([]string{terms.Sku, terms.OnDemand.OfferTermCode, HourlyRateCodeCn}, ".")]
  1155. if ok {
  1156. cost = c.PricePerUnit.CNY
  1157. } else {
  1158. publicPricingFound = false
  1159. }
  1160. }
  1161. if spotInfo, ok := aws.spotPricing(k.ID()); ok {
  1162. var spotcost string
  1163. log.DedupedInfof(5, "Looking up spot data from feed for node %s", k.ID())
  1164. arr := strings.Split(spotInfo.Charge, " ")
  1165. if len(arr) == 2 {
  1166. spotcost = arr[0]
  1167. } else {
  1168. log.Infof("Spot data for node %s is missing", k.ID())
  1169. }
  1170. return &models.Node{
  1171. Cost: spotcost,
  1172. VCPU: terms.VCpu,
  1173. RAM: terms.Memory,
  1174. GPU: terms.GPU,
  1175. Storage: terms.Storage,
  1176. BaseCPUPrice: aws.BaseCPUPrice,
  1177. BaseRAMPrice: aws.BaseRAMPrice,
  1178. BaseGPUPrice: aws.BaseGPUPrice,
  1179. UsageType: PreemptibleType,
  1180. }, meta, nil
  1181. } else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
  1182. if aws.SpotRefreshEnabled() {
  1183. log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
  1184. }
  1185. if publicPricingFound {
  1186. // return public price if found
  1187. return &models.Node{
  1188. Cost: cost,
  1189. VCPU: terms.VCpu,
  1190. RAM: terms.Memory,
  1191. GPU: terms.GPU,
  1192. Storage: terms.Storage,
  1193. BaseCPUPrice: aws.BaseCPUPrice,
  1194. BaseRAMPrice: aws.BaseRAMPrice,
  1195. BaseGPUPrice: aws.BaseGPUPrice,
  1196. UsageType: PreemptibleType,
  1197. }, meta, nil
  1198. } else {
  1199. // return defaults if public pricing not found
  1200. if aws.SpotRefreshEnabled() {
  1201. log.DedupedWarningf(5, "Could not find Node %s's public pricing info, using default configured spot prices instead", k.ID())
  1202. }
  1203. return &models.Node{
  1204. VCPU: terms.VCpu,
  1205. VCPUCost: aws.BaseSpotCPUPrice,
  1206. RAMCost: aws.BaseSpotRAMPrice,
  1207. RAM: terms.Memory,
  1208. GPU: terms.GPU,
  1209. Storage: terms.Storage,
  1210. BaseCPUPrice: aws.BaseCPUPrice,
  1211. BaseRAMPrice: aws.BaseRAMPrice,
  1212. BaseGPUPrice: aws.BaseGPUPrice,
  1213. UsageType: PreemptibleType,
  1214. }, meta, nil
  1215. }
  1216. } else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
  1217. strCost := fmt.Sprintf("%f", sp.EffectiveCost)
  1218. return &models.Node{
  1219. Cost: strCost,
  1220. VCPU: terms.VCpu,
  1221. RAM: terms.Memory,
  1222. GPU: terms.GPU,
  1223. Storage: terms.Storage,
  1224. BaseCPUPrice: aws.BaseCPUPrice,
  1225. BaseRAMPrice: aws.BaseRAMPrice,
  1226. BaseGPUPrice: aws.BaseGPUPrice,
  1227. UsageType: usageType,
  1228. }, meta, nil
  1229. } else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
  1230. strCost := fmt.Sprintf("%f", ri.EffectiveCost)
  1231. return &models.Node{
  1232. Cost: strCost,
  1233. VCPU: terms.VCpu,
  1234. RAM: terms.Memory,
  1235. GPU: terms.GPU,
  1236. Storage: terms.Storage,
  1237. BaseCPUPrice: aws.BaseCPUPrice,
  1238. BaseRAMPrice: aws.BaseRAMPrice,
  1239. BaseGPUPrice: aws.BaseGPUPrice,
  1240. UsageType: usageType,
  1241. }, meta, nil
  1242. }
  1243. // Throw error if public price is not found
  1244. if !publicPricingFound {
  1245. return nil, meta, fmt.Errorf("for node \"%s\", cannot find the following key in OnDemand pricing data \"%s\"", k.ID(), k.Features())
  1246. }
  1247. return &models.Node{
  1248. Cost: cost,
  1249. VCPU: terms.VCpu,
  1250. RAM: terms.Memory,
  1251. GPU: terms.GPU,
  1252. Storage: terms.Storage,
  1253. BaseCPUPrice: aws.BaseCPUPrice,
  1254. BaseRAMPrice: aws.BaseRAMPrice,
  1255. BaseGPUPrice: aws.BaseGPUPrice,
  1256. UsageType: usageType,
  1257. }, meta, nil
  1258. }
  1259. func (aws *AWS) getFargatePod(awsKey *awsKey) (*clustercache.Pod, bool) {
  1260. pods := aws.Clientset.GetAllPods()
  1261. for _, pod := range pods {
  1262. if pod.Spec.NodeName == awsKey.Name {
  1263. return pod, true
  1264. }
  1265. }
  1266. return nil, false
  1267. }
  1268. const (
  1269. nodeOSLabel = "kubernetes.io/os"
  1270. nodeArchLabel = "kubernetes.io/arch"
  1271. fargatePodCapacityAnnotation = "CapacityProvisioned"
  1272. )
  1273. // e.g. "0.25vCPU 0.5GB"
  1274. var fargatePodCapacityRegex = regexp.MustCompile("^([0-9.]+)vCPU ([0-9.]+)GB$")
  1275. func (aws *AWS) createFargateNode(awsKey *awsKey, usageType string) (*models.Node, models.PricingMetadata, error) {
  1276. if aws.FargatePricing == nil {
  1277. return nil, models.PricingMetadata{}, fmt.Errorf("fargate pricing not initialized")
  1278. }
  1279. pod, ok := aws.getFargatePod(awsKey)
  1280. if !ok {
  1281. return nil, models.PricingMetadata{}, fmt.Errorf("could not find pod for fargate node %s", awsKey.Name)
  1282. }
  1283. capacity := pod.Annotations[fargatePodCapacityAnnotation]
  1284. match := fargatePodCapacityRegex.FindStringSubmatch(capacity)
  1285. if len(match) == 0 {
  1286. return nil, models.PricingMetadata{}, fmt.Errorf("could not parse pod capacity for fargate node %s", awsKey.Name)
  1287. }
  1288. vCPU, err := strconv.ParseFloat(match[1], 64)
  1289. if err != nil {
  1290. return nil, models.PricingMetadata{}, fmt.Errorf("could not parse vCPU capacity for fargate node %s: %v", awsKey.Name, err)
  1291. }
  1292. memory, err := strconv.ParseFloat(match[2], 64)
  1293. if err != nil {
  1294. return nil, models.PricingMetadata{}, fmt.Errorf("could not parse memory capacity for fargate node %s: %v", awsKey.Name, err)
  1295. }
  1296. region, ok := util.GetRegion(awsKey.Labels)
  1297. if !ok {
  1298. return nil, models.PricingMetadata{}, fmt.Errorf("could not get region for fargate node %s", awsKey.Name)
  1299. }
  1300. nodeOS := awsKey.Labels[nodeOSLabel]
  1301. nodeArch := awsKey.Labels[nodeArchLabel]
  1302. hourlyCPU, hourlyRAM, err := aws.FargatePricing.GetHourlyPricing(region, nodeOS, nodeArch)
  1303. if err != nil {
  1304. return nil, models.PricingMetadata{}, fmt.Errorf("could not get hourly pricing for fargate node %s: %v", awsKey.Name, err)
  1305. }
  1306. cost := hourlyCPU*vCPU + hourlyRAM*memory
  1307. return &models.Node{
  1308. Cost: strconv.FormatFloat(cost, 'f', -1, 64),
  1309. VCPU: strconv.FormatFloat(vCPU, 'f', -1, 64),
  1310. RAM: strconv.FormatFloat(memory, 'f', -1, 64),
  1311. RAMBytes: strconv.FormatFloat(memory*1024*1024*1024, 'f', -1, 64),
  1312. VCPUCost: strconv.FormatFloat(hourlyCPU, 'f', -1, 64),
  1313. RAMCost: strconv.FormatFloat(hourlyRAM, 'f', -1, 64),
  1314. BaseCPUPrice: aws.BaseCPUPrice,
  1315. BaseRAMPrice: aws.BaseRAMPrice,
  1316. BaseGPUPrice: aws.BaseGPUPrice,
  1317. UsageType: usageType,
  1318. }, models.PricingMetadata{}, nil
  1319. }
  1320. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  1321. func (aws *AWS) NodePricing(k models.Key) (*models.Node, models.PricingMetadata, error) {
  1322. aws.DownloadPricingDataLock.RLock()
  1323. defer aws.DownloadPricingDataLock.RUnlock()
  1324. key := k.Features()
  1325. usageType := "ondemand"
  1326. if aws.isPreemptible(key) {
  1327. usageType = PreemptibleType
  1328. }
  1329. meta := models.PricingMetadata{}
  1330. terms, ok := aws.Pricing[key]
  1331. if termsStr, err := json.Marshal(terms); err == nil {
  1332. log.Debugf("NodePricing: for key \"%s\" found the following OnDemand data: %s", key, string(termsStr))
  1333. }
  1334. if ok {
  1335. return aws.createNode(terms, usageType, k)
  1336. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  1337. aws.DownloadPricingDataLock.RUnlock()
  1338. err := aws.DownloadPricingData()
  1339. aws.DownloadPricingDataLock.RLock()
  1340. if err != nil {
  1341. return &models.Node{
  1342. Cost: aws.BaseCPUPrice,
  1343. BaseCPUPrice: aws.BaseCPUPrice,
  1344. BaseRAMPrice: aws.BaseRAMPrice,
  1345. BaseGPUPrice: aws.BaseGPUPrice,
  1346. UsageType: usageType,
  1347. UsesBaseCPUPrice: true,
  1348. }, meta, err
  1349. }
  1350. terms, termsOk := aws.Pricing[key]
  1351. if !termsOk {
  1352. return &models.Node{
  1353. Cost: aws.BaseCPUPrice,
  1354. BaseCPUPrice: aws.BaseCPUPrice,
  1355. BaseRAMPrice: aws.BaseRAMPrice,
  1356. BaseGPUPrice: aws.BaseGPUPrice,
  1357. UsageType: usageType,
  1358. UsesBaseCPUPrice: true,
  1359. }, meta, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  1360. }
  1361. return aws.createNode(terms, usageType, k)
  1362. } else if awsKey, ok := k.(*awsKey); ok && awsKey.isFargateNode() {
  1363. // Since Fargate pricing is listed at AmazonECS and is different from AmazonEC2, we handle it separately here
  1364. return aws.createFargateNode(awsKey, usageType)
  1365. } else { // Fall back to base pricing if we can't find the key. Base pricing is handled at the costmodel level.
  1366. // we seem to have an issue where this error gets thrown during app start.
  1367. // somehow the ValidPricingKeys map is being accessed before all the pricing data has been downloaded
  1368. return nil, meta, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
  1369. }
  1370. }
  1371. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  1372. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  1373. c, err := awsProvider.GetConfig()
  1374. if err != nil {
  1375. return nil, err
  1376. }
  1377. const defaultClusterName = "AWS Cluster #1"
  1378. // Determine cluster name
  1379. clusterName := c.ClusterName
  1380. if clusterName == "" {
  1381. awsClusterID := env.GetAWSClusterID()
  1382. if awsClusterID != "" {
  1383. log.Infof("Returning \"%s\" as ClusterName", awsClusterID)
  1384. clusterName = awsClusterID
  1385. log.Warnf("Warning - %s will be deprecated in a future release. Use %s instead", env.AWSClusterIDEnvVar, coreenv.ClusterIDEnvVar)
  1386. } else if clusterName = coreenv.GetClusterID(); clusterName != "" {
  1387. log.DedupedInfof(5, "Setting cluster name to %s from %s ", clusterName, coreenv.ClusterIDEnvVar)
  1388. } else {
  1389. clusterName = defaultClusterName
  1390. log.DedupedWarningf(5, "Unable to detect cluster name - using default of %s", defaultClusterName)
  1391. log.DedupedWarningf(5, "Please set cluster name through configmap or via %s env var", coreenv.ClusterIDEnvVar)
  1392. }
  1393. }
  1394. // this value requires configuration but is unavailable else where
  1395. clusterAccountID := c.ClusterAccountID
  1396. // Use AthenaProjectID if Cluster Account is not set to support older configs
  1397. if clusterAccountID == "" {
  1398. clusterAccountID = c.AthenaProjectID
  1399. }
  1400. m := make(map[string]string)
  1401. m["name"] = clusterName
  1402. m["provider"] = opencost.AWSProvider
  1403. m["account"] = clusterAccountID
  1404. m["region"] = awsProvider.ClusterRegion
  1405. m["id"] = coreenv.GetClusterID()
  1406. m["remoteReadEnabled"] = strconv.FormatBool(env.IsRemoteEnabled())
  1407. m["provisioner"] = awsProvider.clusterProvisioner
  1408. return m, nil
  1409. }
  1410. // updates the authentication to the latest values (via config or secret)
  1411. func (aws *AWS) ConfigureAuth() error {
  1412. c, err := aws.Config.GetCustomPricingData()
  1413. if err != nil {
  1414. log.Errorf("Error downloading default pricing data: %s", err.Error())
  1415. }
  1416. return aws.ConfigureAuthWith(c)
  1417. }
  1418. // updates the authentication to the latest values (via config or secret)
  1419. func (aws *AWS) ConfigureAuthWith(config *models.CustomPricing) error {
  1420. accessKeyID, accessKeySecret := aws.getAWSAuth(false, config)
  1421. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1422. err := coreenv.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
  1423. if err != nil {
  1424. return err
  1425. }
  1426. err = coreenv.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
  1427. if err != nil {
  1428. return err
  1429. }
  1430. }
  1431. return nil
  1432. }
  1433. // Gets the aws key id and secret
  1434. func (aws *AWS) getAWSAuth(forceReload bool, cp *models.CustomPricing) (string, string) {
  1435. // 1. Check config values first (set from frontend UI)
  1436. if cp.AwsServiceKeyName != "" && cp.AwsServiceKeySecret != "" {
  1437. aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
  1438. Message: "AWS ServiceKey exists",
  1439. Status: true,
  1440. })
  1441. return cp.AwsServiceKeyName, cp.AwsServiceKeySecret
  1442. }
  1443. // 2. Check for secret
  1444. s, _ := aws.loadAWSAuthSecret(forceReload)
  1445. if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
  1446. aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
  1447. Message: "AWS ServiceKey exists",
  1448. Status: true,
  1449. })
  1450. return s.AccessKeyID, s.SecretAccessKey
  1451. }
  1452. // 3. Fall back to env vars
  1453. if env.GetAWSAccessKeyID() == "" || env.GetAWSAccessKeySecret() == "" {
  1454. aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
  1455. Message: "AWS ServiceKey exists",
  1456. Status: false,
  1457. })
  1458. } else {
  1459. aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
  1460. Message: "AWS ServiceKey exists",
  1461. Status: true,
  1462. })
  1463. }
  1464. return env.GetAWSAccessKeyID(), env.GetAWSAccessKeySecret()
  1465. }
  1466. // Load once and cache the result (even on failure). This is an install time secret, so
  1467. // we don't expect the secret to change. If it does, however, we can force reload using
  1468. // the input parameter.
  1469. func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
  1470. if !force && loadedAWSSecret {
  1471. return awsSecret, nil
  1472. }
  1473. loadedAWSSecret = true
  1474. exists, err := fileutil.FileExists(models.AuthSecretPath)
  1475. if !exists || err != nil {
  1476. return nil, fmt.Errorf("Failed to locate service account file: %s", models.AuthSecretPath)
  1477. }
  1478. result, err := os.ReadFile(models.AuthSecretPath)
  1479. if err != nil {
  1480. return nil, err
  1481. }
  1482. var ak AWSAccessKey
  1483. err = json.Unmarshal(result, &ak)
  1484. if err != nil {
  1485. return nil, err
  1486. }
  1487. awsSecret = &ak
  1488. return awsSecret, nil
  1489. }
  1490. func (aws *AWS) getAddressesForRegion(ctx context.Context, region string) (*ec2.DescribeAddressesOutput, error) {
  1491. aak, err := aws.GetAWSAccessKey()
  1492. if err != nil {
  1493. return nil, err
  1494. }
  1495. cfg, err := aak.CreateConfig(region)
  1496. if err != nil {
  1497. return nil, err
  1498. }
  1499. cli := ec2.NewFromConfig(cfg)
  1500. return cli.DescribeAddresses(ctx, &ec2.DescribeAddressesInput{})
  1501. }
  1502. func (aws *AWS) getAllAddresses() ([]*ec2Types.Address, error) {
  1503. aws.ConfigureAuth() // load authentication data into env vars
  1504. regions := aws.Regions()
  1505. addressCh := make(chan *ec2.DescribeAddressesOutput, len(regions))
  1506. errorCh := make(chan error, len(regions))
  1507. var wg sync.WaitGroup
  1508. wg.Add(len(regions))
  1509. // Get volumes from each AWS region
  1510. for _, r := range regions {
  1511. region := r // make a copy of r to avoid capturing loop variable
  1512. // Fetch IP address response and send results and errors to their
  1513. // respective channels
  1514. go func() {
  1515. defer wg.Done()
  1516. defer errs.HandlePanic()
  1517. // Query for first page of volume results
  1518. resp, err := aws.getAddressesForRegion(context.TODO(), region)
  1519. if err != nil {
  1520. var awsErr smithy.APIError
  1521. if errors.As(err, &awsErr) {
  1522. switch awsErr.ErrorCode() {
  1523. case "AuthFailure", "InvalidClientTokenId", "UnauthorizedOperation":
  1524. log.DedupedInfof(5, "Unable to get addresses for region %s due to AWS permissions, error message: %s", region, awsErr.ErrorMessage())
  1525. return
  1526. default:
  1527. errorCh <- err
  1528. return
  1529. }
  1530. } else {
  1531. errorCh <- err
  1532. return
  1533. }
  1534. }
  1535. addressCh <- resp
  1536. }()
  1537. }
  1538. // Close the result channels after everything has been sent
  1539. go func() {
  1540. defer errs.HandlePanic()
  1541. wg.Wait()
  1542. close(errorCh)
  1543. close(addressCh)
  1544. }()
  1545. var addresses []*ec2Types.Address
  1546. for adds := range addressCh {
  1547. for _, add := range adds.Addresses {
  1548. a := add // duplicate to avoid pointer to iterator
  1549. addresses = append(addresses, &a)
  1550. }
  1551. }
  1552. var errs []error
  1553. for err := range errorCh {
  1554. log.DedupedWarningf(5, "unable to get addresses: %s", err)
  1555. errs = append(errs, err)
  1556. }
  1557. // Return error if no addresses are returned
  1558. if len(errs) > 0 && len(addresses) == 0 {
  1559. return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errs), errs)
  1560. }
  1561. return addresses, nil
  1562. }
  1563. // GetAddresses retrieves EC2 addresses
  1564. func (aws *AWS) GetAddresses() ([]byte, error) {
  1565. addresses, err := aws.getAllAddresses()
  1566. if err != nil {
  1567. return nil, err
  1568. }
  1569. // Format the response this way to match the JSON-encoded formatting of a single response
  1570. // from DescribeAddresss, so that consumers can always expect AWS disk responses to have
  1571. // a "Addresss" key at the top level.
  1572. return json.Marshal(map[string][]*ec2Types.Address{
  1573. "Addresses": addresses,
  1574. })
  1575. }
  1576. func (aws *AWS) isAddressOrphaned(address *ec2Types.Address) bool {
  1577. if address.AssociationId != nil {
  1578. return false
  1579. }
  1580. return true
  1581. }
  1582. func (aws *AWS) getDisksForRegion(ctx context.Context, region string, maxResults int32, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
  1583. aak, err := aws.GetAWSAccessKey()
  1584. if err != nil {
  1585. return nil, err
  1586. }
  1587. cfg, err := aak.CreateConfig(region)
  1588. if err != nil {
  1589. return nil, err
  1590. }
  1591. cli := ec2.NewFromConfig(cfg)
  1592. return cli.DescribeVolumes(ctx, &ec2.DescribeVolumesInput{
  1593. MaxResults: &maxResults,
  1594. NextToken: nextToken,
  1595. })
  1596. }
  1597. func (aws *AWS) getAllDisks() ([]*ec2Types.Volume, error) {
  1598. aws.ConfigureAuth() // load authentication data into env vars
  1599. regions := aws.Regions()
  1600. volumeCh := make(chan *ec2.DescribeVolumesOutput, len(regions))
  1601. errorCh := make(chan error, len(regions))
  1602. var wg sync.WaitGroup
  1603. wg.Add(len(regions))
  1604. // Get volumes from each AWS region
  1605. for _, r := range regions {
  1606. // Fetch volume response and send results and errors to their
  1607. // respective channels
  1608. go func(region string) {
  1609. defer wg.Done()
  1610. defer errs.HandlePanic()
  1611. // Query for first page of volume results
  1612. resp, err := aws.getDisksForRegion(context.TODO(), region, 1000, nil)
  1613. if err != nil {
  1614. var awsErr smithy.APIError
  1615. if errors.As(err, &awsErr) {
  1616. switch awsErr.ErrorCode() {
  1617. case "AuthFailure", "InvalidClientTokenId", "UnauthorizedOperation":
  1618. log.DedupedInfof(5, "Unable to get disks for region %s due to AWS permissions, error message: %s", region, awsErr.ErrorMessage())
  1619. return
  1620. default:
  1621. errorCh <- err
  1622. return
  1623. }
  1624. } else {
  1625. errorCh <- err
  1626. return
  1627. }
  1628. }
  1629. volumeCh <- resp
  1630. // A NextToken indicates more pages of results. Keep querying
  1631. // until all pages are retrieved.
  1632. for resp.NextToken != nil {
  1633. resp, err = aws.getDisksForRegion(context.TODO(), region, 100, resp.NextToken)
  1634. if err != nil {
  1635. errorCh <- err
  1636. return
  1637. }
  1638. volumeCh <- resp
  1639. }
  1640. }(r)
  1641. }
  1642. // Close the result channels after everything has been sent
  1643. go func() {
  1644. defer errs.HandlePanic()
  1645. wg.Wait()
  1646. close(errorCh)
  1647. close(volumeCh)
  1648. }()
  1649. var volumes []*ec2Types.Volume
  1650. for vols := range volumeCh {
  1651. for _, vol := range vols.Volumes {
  1652. v := vol // duplicate to avoid pointer to iterator
  1653. volumes = append(volumes, &v)
  1654. }
  1655. }
  1656. var errs []error
  1657. for err := range errorCh {
  1658. log.DedupedWarningf(5, "unable to get disks: %s", err)
  1659. errs = append(errs, err)
  1660. }
  1661. // Return error if no volumes are returned
  1662. if len(errs) > 0 && len(volumes) == 0 {
  1663. return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errs), errs)
  1664. }
  1665. return volumes, nil
  1666. }
  1667. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  1668. func (aws *AWS) GetDisks() ([]byte, error) {
  1669. volumes, err := aws.getAllDisks()
  1670. if err != nil {
  1671. return nil, err
  1672. }
  1673. // Format the response this way to match the JSON-encoded formatting of a single response
  1674. // from DescribeVolumes, so that consumers can always expect AWS disk responses to have
  1675. // a "Volumes" key at the top level.
  1676. return json.Marshal(map[string][]*ec2Types.Volume{
  1677. "Volumes": volumes,
  1678. })
  1679. }
  1680. func (aws *AWS) isDiskOrphaned(vol *ec2Types.Volume) bool {
  1681. // Do not consider volume orphaned if in use
  1682. if vol.State == InUseState {
  1683. return false
  1684. }
  1685. // Do not consider volume orphaned if volume is attached to any attachments
  1686. if len(vol.Attachments) != 0 {
  1687. for _, attachment := range vol.Attachments {
  1688. if attachment.State == AttachedState {
  1689. return false
  1690. }
  1691. }
  1692. }
  1693. return true
  1694. }
  1695. func (aws *AWS) GetOrphanedResources() ([]models.OrphanedResource, error) {
  1696. volumes, volumesErr := aws.getAllDisks()
  1697. addresses, addressesErr := aws.getAllAddresses()
  1698. // If we have any orphaned resources - prioritize returning them over returning errors
  1699. if len(addresses) == 0 && len(volumes) == 0 {
  1700. if volumesErr != nil {
  1701. return nil, volumesErr
  1702. }
  1703. if addressesErr != nil {
  1704. return nil, addressesErr
  1705. }
  1706. }
  1707. var orphanedResources []models.OrphanedResource
  1708. for _, volume := range volumes {
  1709. if aws.isDiskOrphaned(volume) {
  1710. cost, err := aws.findCostForDisk(volume)
  1711. if err != nil {
  1712. return nil, err
  1713. }
  1714. var volumeSize int64
  1715. if volume.Size != nil {
  1716. volumeSize = int64(*volume.Size)
  1717. }
  1718. // This is turning us-east-1a into us-east-1
  1719. var zone string
  1720. if volume.AvailabilityZone != nil {
  1721. zone = *volume.AvailabilityZone
  1722. }
  1723. var region, url string
  1724. region = regionRx.FindString(zone)
  1725. if region != "" {
  1726. url = "https://console.aws.amazon.com/ec2/home?region=" + region + "#Volumes:sort=desc:createTime"
  1727. } else {
  1728. url = "https://console.aws.amazon.com/ec2/home?#Volumes:sort=desc:createTime"
  1729. }
  1730. // output tags as desc
  1731. tags := map[string]string{}
  1732. for _, tag := range volume.Tags {
  1733. tags[*tag.Key] = *tag.Value
  1734. }
  1735. or := models.OrphanedResource{
  1736. Kind: "disk",
  1737. Region: zone,
  1738. Size: &volumeSize,
  1739. DiskName: *volume.VolumeId,
  1740. Url: url,
  1741. MonthlyCost: cost,
  1742. Description: tags,
  1743. }
  1744. orphanedResources = append(orphanedResources, or)
  1745. }
  1746. }
  1747. for _, address := range addresses {
  1748. if aws.isAddressOrphaned(address) {
  1749. cost := AWSHourlyPublicIPCost * timeutil.HoursPerMonth
  1750. desc := map[string]string{}
  1751. for _, tag := range address.Tags {
  1752. if tag.Key == nil {
  1753. continue
  1754. }
  1755. if tag.Value == nil {
  1756. desc[*tag.Key] = ""
  1757. } else {
  1758. desc[*tag.Key] = *tag.Value
  1759. }
  1760. }
  1761. or := models.OrphanedResource{
  1762. Kind: "address",
  1763. Address: *address.PublicIp,
  1764. Description: desc,
  1765. Url: "http://console.aws.amazon.com/ec2/home?#Addresses",
  1766. MonthlyCost: &cost,
  1767. }
  1768. orphanedResources = append(orphanedResources, or)
  1769. }
  1770. }
  1771. return orphanedResources, nil
  1772. }
  1773. func (aws *AWS) findCostForDisk(disk *ec2Types.Volume) (*float64, error) {
  1774. // todo: use AWS pricing from all regions
  1775. if disk.AvailabilityZone == nil {
  1776. return nil, fmt.Errorf("nil region")
  1777. }
  1778. if disk.Size == nil {
  1779. return nil, fmt.Errorf("nil disk size")
  1780. }
  1781. class := volTypes[string(disk.VolumeType)]
  1782. key := aws.ClusterRegion + "," + class
  1783. pricing, ok := aws.Pricing[key]
  1784. if !ok {
  1785. return nil, fmt.Errorf("no pricing data for key '%s'", key)
  1786. }
  1787. if pricing == nil {
  1788. return nil, fmt.Errorf("nil pricing data for key '%s'", key)
  1789. }
  1790. if pricing.PV == nil {
  1791. return nil, fmt.Errorf("pricing for key '%s' has nil PV", key)
  1792. }
  1793. priceStr := pricing.PV.Cost
  1794. price, err := strconv.ParseFloat(priceStr, 64)
  1795. if err != nil {
  1796. return nil, err
  1797. }
  1798. cost := price * timeutil.HoursPerMonth * float64(*disk.Size)
  1799. return &cost, nil
  1800. }
  1801. // QueryAthenaPaginated executes athena query and processes results.
  1802. func (aws *AWS) QueryAthenaPaginated(ctx context.Context, query string, fn func(*athena.GetQueryResultsOutput) bool) error {
  1803. awsAthenaInfo, err := aws.GetAWSAthenaInfo()
  1804. if err != nil {
  1805. return err
  1806. }
  1807. if awsAthenaInfo.AthenaDatabase == "" || awsAthenaInfo.AthenaTable == "" || awsAthenaInfo.AthenaRegion == "" ||
  1808. awsAthenaInfo.AthenaBucketName == "" || awsAthenaInfo.AccountID == "" {
  1809. return fmt.Errorf("QueryAthenaPaginated: athena configuration incomplete")
  1810. }
  1811. queryExecutionCtx := &athenaTypes.QueryExecutionContext{
  1812. Database: awsSDK.String(awsAthenaInfo.AthenaDatabase),
  1813. }
  1814. if awsAthenaInfo.AthenaCatalog != "" {
  1815. queryExecutionCtx.Catalog = awsSDK.String(awsAthenaInfo.AthenaCatalog)
  1816. }
  1817. resultConfiguration := &athenaTypes.ResultConfiguration{
  1818. OutputLocation: awsSDK.String(awsAthenaInfo.AthenaBucketName),
  1819. }
  1820. startQueryExecutionInput := &athena.StartQueryExecutionInput{
  1821. QueryString: awsSDK.String(query),
  1822. QueryExecutionContext: queryExecutionCtx,
  1823. ResultConfiguration: resultConfiguration,
  1824. }
  1825. // Only set if there is a value, the default input is nil which defaults to the 'primary' workgroup
  1826. if awsAthenaInfo.AthenaWorkgroup != "" {
  1827. startQueryExecutionInput.WorkGroup = awsSDK.String(awsAthenaInfo.AthenaWorkgroup)
  1828. }
  1829. // Create Athena Client
  1830. cfg, err := awsAthenaInfo.CreateConfig()
  1831. if err != nil {
  1832. log.Errorf("Could not retrieve Athena Configuration: %s", err.Error())
  1833. }
  1834. cli := athena.NewFromConfig(cfg)
  1835. // Query Athena
  1836. startQueryExecutionOutput, err := cli.StartQueryExecution(ctx, startQueryExecutionInput)
  1837. if err != nil {
  1838. return fmt.Errorf("QueryAthenaPaginated: start query error: %s", err.Error())
  1839. }
  1840. err = waitForQueryToComplete(ctx, cli, startQueryExecutionOutput.QueryExecutionId)
  1841. if err != nil {
  1842. return fmt.Errorf("QueryAthenaPaginated: query execution error: %s", err.Error())
  1843. }
  1844. queryResultsInput := &athena.GetQueryResultsInput{
  1845. QueryExecutionId: startQueryExecutionOutput.QueryExecutionId,
  1846. }
  1847. getQueryResultsPaginator := athena.NewGetQueryResultsPaginator(cli, queryResultsInput)
  1848. for getQueryResultsPaginator.HasMorePages() {
  1849. pg, err := getQueryResultsPaginator.NextPage(ctx)
  1850. if err != nil {
  1851. log.Errorf("QueryAthenaPaginated: NextPage error: %s", err.Error())
  1852. continue
  1853. }
  1854. fn(pg)
  1855. }
  1856. return nil
  1857. }
  1858. type SavingsPlanData struct {
  1859. ResourceID string
  1860. EffectiveCost float64
  1861. SavingsPlanARN string
  1862. MostRecentDate string
  1863. }
  1864. func (aws *AWS) GetSavingsPlanDataFromAthena() error {
  1865. cfg, err := aws.GetConfig()
  1866. if err != nil {
  1867. aws.RIPricingError = err
  1868. return err
  1869. }
  1870. if cfg.AthenaBucketName == "" {
  1871. err = ErrNoAthenaBucket
  1872. aws.RIPricingError = err
  1873. return err
  1874. }
  1875. if aws.SavingsPlanDataByInstanceID == nil {
  1876. aws.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData)
  1877. }
  1878. tNow := time.Now()
  1879. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1880. start := tOneDayAgo.Format("2006-01-02")
  1881. end := tNow.Format("2006-01-02")
  1882. // Use Savings Plan Effective Rate as an estimation for cost, assuming the 1h most recent period got a fully loaded savings plan.
  1883. //
  1884. q := `SELECT
  1885. line_item_usage_start_date,
  1886. savings_plan_savings_plan_a_r_n,
  1887. line_item_resource_id,
  1888. savings_plan_savings_plan_rate
  1889. FROM %s as cost_data
  1890. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1891. AND line_item_line_item_type = 'SavingsPlanCoveredUsage' ORDER BY
  1892. line_item_usage_start_date DESC`
  1893. page := 0
  1894. mostRecentDate := ""
  1895. processResults := func(op *athena.GetQueryResultsOutput) bool {
  1896. if op == nil {
  1897. log.Errorf("GetSavingsPlanDataFromAthena: Athena page is nil")
  1898. return false
  1899. } else if op.ResultSet == nil {
  1900. log.Errorf("GetSavingsPlanDataFromAthena: Athena page.ResultSet is nil")
  1901. return false
  1902. }
  1903. aws.SavingsPlanDataLock.Lock()
  1904. defer aws.SavingsPlanDataLock.Unlock()
  1905. if page == 0 {
  1906. aws.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData) // Clean out the old data and only report a savingsplan price if its in the most recent run.
  1907. }
  1908. iter := op.ResultSet.Rows
  1909. if page == 0 && len(iter) > 0 {
  1910. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  1911. }
  1912. page++
  1913. for _, r := range iter {
  1914. d := *r.Data[0].VarCharValue
  1915. if mostRecentDate == "" {
  1916. mostRecentDate = d
  1917. } else if mostRecentDate != d { // Get all most recent assignments
  1918. break
  1919. }
  1920. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1921. if err != nil {
  1922. log.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1923. }
  1924. r := &SavingsPlanData{
  1925. ResourceID: *r.Data[2].VarCharValue,
  1926. EffectiveCost: cost,
  1927. SavingsPlanARN: *r.Data[1].VarCharValue,
  1928. MostRecentDate: d,
  1929. }
  1930. aws.SavingsPlanDataByInstanceID[r.ResourceID] = r
  1931. }
  1932. log.Debugf("Found %d savings plan applied instances", len(aws.SavingsPlanDataByInstanceID))
  1933. for k, r := range aws.SavingsPlanDataByInstanceID {
  1934. log.DedupedInfof(5, "Savings Plan Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1935. }
  1936. return true
  1937. }
  1938. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1939. log.Debugf("Running Query: %s", query)
  1940. err = aws.QueryAthenaPaginated(context.TODO(), query, processResults)
  1941. if err != nil {
  1942. aws.RIPricingError = err
  1943. return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
  1944. }
  1945. return nil
  1946. }
  1947. type RIData struct {
  1948. ResourceID string
  1949. EffectiveCost float64
  1950. ReservationARN string
  1951. MostRecentDate string
  1952. }
  1953. func (aws *AWS) GetReservationDataFromAthena() error {
  1954. cfg, err := aws.GetConfig()
  1955. if err != nil {
  1956. aws.RIPricingError = err
  1957. return err
  1958. }
  1959. if cfg.AthenaBucketName == "" {
  1960. err = ErrNoAthenaBucket
  1961. aws.RIPricingError = err
  1962. return err
  1963. }
  1964. // Query for all column names in advance in order to validate configured
  1965. // label columns
  1966. columns, _ := aws.fetchColumns()
  1967. if !columns["reservation_reservation_a_r_n"] || !columns["reservation_effective_cost"] {
  1968. err = fmt.Errorf("no reservation data available in Athena")
  1969. aws.RIPricingError = err
  1970. return err
  1971. }
  1972. if aws.RIPricingByInstanceID == nil {
  1973. aws.RIPricingByInstanceID = make(map[string]*RIData)
  1974. }
  1975. tNow := time.Now()
  1976. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1977. start := tOneDayAgo.Format("2006-01-02")
  1978. end := tNow.Format("2006-01-02")
  1979. q := `SELECT
  1980. line_item_usage_start_date,
  1981. reservation_reservation_a_r_n,
  1982. line_item_resource_id,
  1983. reservation_effective_cost
  1984. FROM %s as cost_data
  1985. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1986. AND reservation_reservation_a_r_n <> '' ORDER BY
  1987. line_item_usage_start_date DESC`
  1988. page := 0
  1989. mostRecentDate := ""
  1990. processResults := func(op *athena.GetQueryResultsOutput) bool {
  1991. if op == nil {
  1992. log.Errorf("GetReservationDataFromAthena: Athena page is nil")
  1993. return false
  1994. } else if op.ResultSet == nil {
  1995. log.Errorf("GetReservationDataFromAthena: Athena page.ResultSet is nil")
  1996. return false
  1997. }
  1998. aws.RIDataLock.Lock()
  1999. defer aws.RIDataLock.Unlock()
  2000. if page == 0 {
  2001. aws.RIPricingByInstanceID = make(map[string]*RIData) // Clean out the old data and only report a RI price if its in the most recent run.
  2002. }
  2003. iter := op.ResultSet.Rows
  2004. if page == 0 && len(iter) > 0 {
  2005. iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
  2006. }
  2007. page++
  2008. for _, r := range iter {
  2009. d := *r.Data[0].VarCharValue
  2010. if mostRecentDate == "" {
  2011. mostRecentDate = d
  2012. } else if mostRecentDate != d { // Get all most recent assignments
  2013. break
  2014. }
  2015. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  2016. if err != nil {
  2017. log.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  2018. }
  2019. r := &RIData{
  2020. ResourceID: *r.Data[2].VarCharValue,
  2021. EffectiveCost: cost,
  2022. ReservationARN: *r.Data[1].VarCharValue,
  2023. MostRecentDate: d,
  2024. }
  2025. aws.RIPricingByInstanceID[r.ResourceID] = r
  2026. }
  2027. log.Debugf("Found %d reserved instances", len(aws.RIPricingByInstanceID))
  2028. for k, r := range aws.RIPricingByInstanceID {
  2029. log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  2030. }
  2031. return true
  2032. }
  2033. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  2034. log.Debugf("Running Query: %s", query)
  2035. err = aws.QueryAthenaPaginated(context.TODO(), query, processResults)
  2036. if err != nil {
  2037. aws.RIPricingError = err
  2038. return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
  2039. }
  2040. aws.RIPricingError = nil
  2041. return nil
  2042. }
  2043. // fetchColumns returns a list of the names of all columns in the configured
  2044. // Athena tables
  2045. func (aws *AWS) fetchColumns() (map[string]bool, error) {
  2046. columnSet := map[string]bool{}
  2047. awsAthenaInfo, err := aws.GetAWSAthenaInfo()
  2048. if err != nil {
  2049. return nil, err
  2050. }
  2051. // This Query is supported by Athena tables and views
  2052. q := `SELECT column_name FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s'`
  2053. query := fmt.Sprintf(q, awsAthenaInfo.AthenaDatabase, awsAthenaInfo.AthenaTable)
  2054. pageNum := 0
  2055. athenaErr := aws.QueryAthenaPaginated(context.TODO(), query, func(page *athena.GetQueryResultsOutput) bool {
  2056. if page == nil {
  2057. log.Errorf("fetchColumns: Athena page is nil")
  2058. return false
  2059. } else if page.ResultSet == nil {
  2060. log.Errorf("fetchColumns: Athena page.ResultSet is nil")
  2061. return false
  2062. }
  2063. // remove header row 'column_name'
  2064. rows := page.ResultSet.Rows[1:]
  2065. for _, row := range rows {
  2066. columnSet[*row.Data[0].VarCharValue] = true
  2067. }
  2068. pageNum++
  2069. return true
  2070. })
  2071. if athenaErr != nil {
  2072. return columnSet, athenaErr
  2073. }
  2074. if len(columnSet) == 0 {
  2075. log.Infof("No columns retrieved from Athena")
  2076. }
  2077. return columnSet, nil
  2078. }
  2079. type spotInfo struct {
  2080. Timestamp string `csv:"Timestamp"`
  2081. UsageType string `csv:"UsageType"`
  2082. Operation string `csv:"Operation"`
  2083. InstanceID string `csv:"InstanceID"`
  2084. MyBidID string `csv:"MyBidID"`
  2085. MyMaxPrice string `csv:"MyMaxPrice"`
  2086. MarketPrice string `csv:"MarketPrice"`
  2087. Charge string `csv:"Charge"`
  2088. Version string `csv:"Version"`
  2089. }
  2090. func (aws *AWS) parseSpotData(bucket string, prefix string, projectID string, region string) (map[string]*spotInfo, error) {
  2091. aws.ConfigureAuth() // configure aws api authentication by setting env vars
  2092. s3Prefix := projectID
  2093. if len(prefix) != 0 {
  2094. s3Prefix = prefix + "/" + s3Prefix
  2095. }
  2096. aak, err := aws.GetAWSAccessKey()
  2097. if err != nil {
  2098. return nil, err
  2099. }
  2100. cfg, err := aak.CreateConfig(region)
  2101. if err != nil {
  2102. return nil, err
  2103. }
  2104. cli := s3.NewFromConfig(cfg)
  2105. downloader := manager.NewDownloader(cli)
  2106. tNow := time.Now()
  2107. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  2108. ls := &s3.ListObjectsInput{
  2109. Bucket: awsSDK.String(bucket),
  2110. Prefix: awsSDK.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  2111. }
  2112. ls2 := &s3.ListObjectsInput{
  2113. Bucket: awsSDK.String(bucket),
  2114. Prefix: awsSDK.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  2115. }
  2116. lso, err := cli.ListObjects(context.TODO(), ls)
  2117. if err != nil {
  2118. aws.ServiceAccountChecks.Set("bucketList", &models.ServiceAccountCheck{
  2119. Message: "Bucket List Permissions Available",
  2120. Status: false,
  2121. AdditionalInfo: err.Error(),
  2122. })
  2123. return nil, err
  2124. } else {
  2125. aws.ServiceAccountChecks.Set("bucketList", &models.ServiceAccountCheck{
  2126. Message: "Bucket List Permissions Available",
  2127. Status: true,
  2128. })
  2129. }
  2130. lsoLen := len(lso.Contents)
  2131. log.Debugf("Found %d spot data files from yesterday", lsoLen)
  2132. if lsoLen == 0 {
  2133. log.Debugf("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  2134. }
  2135. lso2, err := cli.ListObjects(context.TODO(), ls2)
  2136. if err != nil {
  2137. return nil, err
  2138. }
  2139. lso2Len := len(lso2.Contents)
  2140. log.Debugf("Found %d spot data files from today", lso2Len)
  2141. if lso2Len == 0 {
  2142. log.Debugf("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  2143. }
  2144. // TODO: Worth it to use LastModifiedDate to determine if we should reparse the spot data?
  2145. var keys []*string
  2146. for _, obj := range lso.Contents {
  2147. keys = append(keys, obj.Key)
  2148. }
  2149. for _, obj := range lso2.Contents {
  2150. keys = append(keys, obj.Key)
  2151. }
  2152. header, err := csvutil.Header(spotInfo{}, "csv")
  2153. if err != nil {
  2154. return nil, err
  2155. }
  2156. fieldsPerRecord := len(header)
  2157. spots := make(map[string]*spotInfo)
  2158. for _, key := range keys {
  2159. getObj := &s3.GetObjectInput{
  2160. Bucket: awsSDK.String(bucket),
  2161. Key: key,
  2162. }
  2163. buf := manager.NewWriteAtBuffer([]byte{})
  2164. _, err := downloader.Download(context.TODO(), buf, getObj)
  2165. if err != nil {
  2166. aws.ServiceAccountChecks.Set("objectList", &models.ServiceAccountCheck{
  2167. Message: "Object Get Permissions Available",
  2168. Status: false,
  2169. AdditionalInfo: err.Error(),
  2170. })
  2171. return nil, err
  2172. } else {
  2173. aws.ServiceAccountChecks.Set("objectList", &models.ServiceAccountCheck{
  2174. Message: "Object Get Permissions Available",
  2175. Status: true,
  2176. })
  2177. }
  2178. r := bytes.NewReader(buf.Bytes())
  2179. gr, err := gzip.NewReader(r)
  2180. if err != nil {
  2181. return nil, err
  2182. }
  2183. csvReader := csv.NewReader(gr)
  2184. csvReader.Comma = '\t'
  2185. csvReader.FieldsPerRecord = fieldsPerRecord
  2186. dec, err := csvutil.NewDecoder(csvReader, header...)
  2187. if err != nil {
  2188. return nil, err
  2189. }
  2190. var foundVersion string
  2191. for {
  2192. spot := spotInfo{}
  2193. err := dec.Decode(&spot)
  2194. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  2195. if err == io.EOF {
  2196. break
  2197. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  2198. rec := dec.Record()
  2199. // the first two "Record()" will be the comment lines
  2200. // and they show up as len() == 1
  2201. // the first of which is "#Version"
  2202. // the second of which is "#Fields: "
  2203. if len(rec) != 1 {
  2204. log.Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  2205. continue
  2206. }
  2207. if len(foundVersion) == 0 {
  2208. spotFeedVersion := rec[0]
  2209. log.Debugf("Spot feed version is \"%s\"", spotFeedVersion)
  2210. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  2211. if matches != nil {
  2212. foundVersion = matches[1]
  2213. if foundVersion != supportedSpotFeedVersion {
  2214. log.Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  2215. break
  2216. }
  2217. }
  2218. continue
  2219. } else if strings.Index(rec[0], "#") == 0 {
  2220. continue
  2221. } else {
  2222. log.Infof("skipping non-TSV line: %s", rec)
  2223. continue
  2224. }
  2225. } else if err != nil {
  2226. log.Warnf("Error during spot info decode: %+v", err)
  2227. continue
  2228. }
  2229. log.DedupedInfof(5, "Found spot info for: %s", spot.InstanceID)
  2230. spots[spot.InstanceID] = &spot
  2231. }
  2232. gr.Close()
  2233. }
  2234. return spots, nil
  2235. }
  2236. // ApplyReservedInstancePricing TODO
  2237. func (aws *AWS) ApplyReservedInstancePricing(nodes map[string]*models.Node) {
  2238. }
  2239. func (aws *AWS) ServiceAccountStatus() *models.ServiceAccountStatus {
  2240. return aws.ServiceAccountChecks.GetStatus()
  2241. }
  2242. func (aws *AWS) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  2243. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  2244. }
  2245. // Regions returns a predefined list of AWS regions
  2246. func (aws *AWS) Regions() []string {
  2247. regionOverrides := env.GetRegionOverrideList()
  2248. if len(regionOverrides) > 0 {
  2249. log.Debugf("Overriding AWS regions with configured region list: %+v", regionOverrides)
  2250. return regionOverrides
  2251. }
  2252. return awsRegions
  2253. }
  2254. // PricingSourceSummary returns the pricing source summary for the provider.
  2255. // The summary represents what was _parsed_ from the pricing source, not
  2256. // everything that was _available_ in the pricing source.
  2257. func (aws *AWS) PricingSourceSummary() interface{} {
  2258. // encode the pricing source summary as a JSON string
  2259. return aws.Pricing
  2260. }