awsprovider.go 48 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. "k8s.io/klog"
  19. "github.com/kubecost/cost-model/clustercache"
  20. "github.com/aws/aws-sdk-go/aws"
  21. "github.com/aws/aws-sdk-go/aws/awserr"
  22. "github.com/aws/aws-sdk-go/aws/session"
  23. "github.com/aws/aws-sdk-go/service/athena"
  24. "github.com/aws/aws-sdk-go/service/ec2"
  25. "github.com/aws/aws-sdk-go/service/s3"
  26. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  27. "github.com/jszwec/csvutil"
  28. v1 "k8s.io/api/core/v1"
  29. )
  30. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  31. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  32. const awsReservedInstancePricePerHour = 0.0287
  33. const supportedSpotFeedVersion = "1"
  34. const SpotInfoUpdateType = "spotinfo"
  35. const AthenaInfoUpdateType = "athenainfo"
  36. // AWS represents an Amazon Provider
  37. type AWS struct {
  38. Pricing map[string]*AWSProductTerms
  39. SpotPricingByInstanceID map[string]*spotInfo
  40. ValidPricingKeys map[string]bool
  41. Clientset clustercache.ClusterCache
  42. BaseCPUPrice string
  43. BaseRAMPrice string
  44. BaseGPUPrice string
  45. BaseSpotCPUPrice string
  46. BaseSpotRAMPrice string
  47. SpotLabelName string
  48. SpotLabelValue string
  49. ServiceKeyName string
  50. ServiceKeySecret string
  51. SpotDataRegion string
  52. SpotDataBucket string
  53. SpotDataPrefix string
  54. ProjectID string
  55. DownloadPricingDataLock sync.RWMutex
  56. ReservedInstances []*AWSReservedInstance
  57. *CustomProvider
  58. }
  59. // AWSPricing maps a k8s node to an AWS Pricing "product"
  60. type AWSPricing struct {
  61. Products map[string]*AWSProduct `json:"products"`
  62. Terms AWSPricingTerms `json:"terms"`
  63. }
  64. // AWSProduct represents a purchased SKU
  65. type AWSProduct struct {
  66. Sku string `json:"sku"`
  67. Attributes AWSProductAttributes `json:"attributes"`
  68. }
  69. // AWSProductAttributes represents metadata about the product used to map to a node.
  70. type AWSProductAttributes struct {
  71. Location string `json:"location"`
  72. InstanceType string `json:"instanceType"`
  73. Memory string `json:"memory"`
  74. Storage string `json:"storage"`
  75. VCpu string `json:"vcpu"`
  76. UsageType string `json:"usagetype"`
  77. OperatingSystem string `json:"operatingSystem"`
  78. PreInstalledSw string `json:"preInstalledSw"`
  79. InstanceFamily string `json:"instanceFamily"`
  80. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  81. }
  82. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  83. type AWSPricingTerms struct {
  84. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  85. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  86. }
  87. // AWSOfferTerm is a sku extension used to pay for the node.
  88. type AWSOfferTerm struct {
  89. Sku string `json:"sku"`
  90. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  91. }
  92. // AWSRateCode encodes data about the price of a product
  93. type AWSRateCode struct {
  94. Unit string `json:"unit"`
  95. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  96. }
  97. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  98. type AWSCurrencyCode struct {
  99. USD string `json:"USD"`
  100. }
  101. // AWSProductTerms represents the full terms of the product
  102. type AWSProductTerms struct {
  103. Sku string `json:"sku"`
  104. OnDemand *AWSOfferTerm `json:"OnDemand"`
  105. Reserved *AWSOfferTerm `json:"Reserved"`
  106. Memory string `json:"memory"`
  107. Storage string `json:"storage"`
  108. VCpu string `json:"vcpu"`
  109. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  110. PV *PV `json:"pv"`
  111. }
  112. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  113. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  114. // OnDemandRateCode is appended to an node sku
  115. const OnDemandRateCode = ".JRTCKXETXF"
  116. // ReservedRateCode is appended to a node sku
  117. const ReservedRateCode = ".38NPMPTW36"
  118. // HourlyRateCode is appended to a node sku
  119. const HourlyRateCode = ".6YS6EN2CT7"
  120. // volTypes are used to map between AWS UsageTypes and
  121. // EBS volume types, as they would appear in K8s storage class
  122. // name and the EC2 API.
  123. var volTypes = map[string]string{
  124. "EBS:VolumeUsage.gp2": "gp2",
  125. "EBS:VolumeUsage": "standard",
  126. "EBS:VolumeUsage.sc1": "sc1",
  127. "EBS:VolumeP-IOPS.piops": "io1",
  128. "EBS:VolumeUsage.st1": "st1",
  129. "EBS:VolumeUsage.piops": "io1",
  130. "gp2": "EBS:VolumeUsage.gp2",
  131. "standard": "EBS:VolumeUsage",
  132. "sc1": "EBS:VolumeUsage.sc1",
  133. "io1": "EBS:VolumeUsage.piops",
  134. "st1": "EBS:VolumeUsage.st1",
  135. }
  136. // locationToRegion maps AWS region names (As they come from Billing)
  137. // to actual region identifiers
  138. var locationToRegion = map[string]string{
  139. "US East (Ohio)": "us-east-2",
  140. "US East (N. Virginia)": "us-east-1",
  141. "US West (N. California)": "us-west-1",
  142. "US West (Oregon)": "us-west-2",
  143. "Asia Pacific (Hong Kong)": "ap-east-1",
  144. "Asia Pacific (Mumbai)": "ap-south-1",
  145. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  146. "Asia Pacific (Seoul)": "ap-northeast-2",
  147. "Asia Pacific (Singapore)": "ap-southeast-1",
  148. "Asia Pacific (Sydney)": "ap-southeast-2",
  149. "Asia Pacific (Tokyo)": "ap-northeast-1",
  150. "Canada (Central)": "ca-central-1",
  151. "China (Beijing)": "cn-north-1",
  152. "China (Ningxia)": "cn-northwest-1",
  153. "EU (Frankfurt)": "eu-central-1",
  154. "EU (Ireland)": "eu-west-1",
  155. "EU (London)": "eu-west-2",
  156. "EU (Paris)": "eu-west-3",
  157. "EU (Stockholm)": "eu-north-1",
  158. "South America (Sao Paulo)": "sa-east-1",
  159. "AWS GovCloud (US-East)": "us-gov-east-1",
  160. "AWS GovCloud (US)": "us-gov-west-1",
  161. }
  162. var regionToBillingRegionCode = map[string]string{
  163. "us-east-2": "USE2",
  164. "us-east-1": "",
  165. "us-west-1": "USW1",
  166. "us-west-2": "USW2",
  167. "ap-east-1": "APE1",
  168. "ap-south-1": "APS3",
  169. "ap-northeast-3": "APN3",
  170. "ap-northeast-2": "APN2",
  171. "ap-southeast-1": "APS1",
  172. "ap-southeast-2": "APS2",
  173. "ap-northeast-1": "APN1",
  174. "ca-central-1": "CAN1",
  175. "cn-north-1": "",
  176. "cn-northwest-1": "",
  177. "eu-central-1": "EUC1",
  178. "eu-west-1": "EU",
  179. "eu-west-2": "EUW2",
  180. "eu-west-3": "EUW3",
  181. "eu-north-1": "EUN1",
  182. "sa-east-1": "SAE1",
  183. "us-gov-east-1": "UGE1",
  184. "us-gov-west-1": "UGW1",
  185. }
  186. func (aws *AWS) GetLocalStorageQuery(offset string) (string, error) {
  187. return "", nil
  188. }
  189. // KubeAttrConversion maps the k8s labels for region to an aws region
  190. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  191. operatingSystem = strings.ToLower(operatingSystem)
  192. region := locationToRegion[location]
  193. return region + "," + instanceType + "," + operatingSystem
  194. }
  195. type AwsSpotFeedInfo struct {
  196. BucketName string `json:"bucketName"`
  197. Prefix string `json:"prefix"`
  198. Region string `json:"region"`
  199. AccountID string `json:"projectID"`
  200. ServiceKeyName string `json:"serviceKeyName"`
  201. ServiceKeySecret string `json:"serviceKeySecret"`
  202. SpotLabel string `json:"spotLabel"`
  203. SpotLabelValue string `json:"spotLabelValue"`
  204. }
  205. type AwsAthenaInfo struct {
  206. AthenaBucketName string `json:"athenaBucketName"`
  207. AthenaRegion string `json:"athenaRegion"`
  208. AthenaDatabase string `json:"athenaDatabase"`
  209. AthenaTable string `json:"athenaTable"`
  210. ServiceKeyName string `json:"serviceKeyName"`
  211. ServiceKeySecret string `json:"serviceKeySecret"`
  212. AccountID string `json:"projectID"`
  213. }
  214. func (aws *AWS) GetManagementPlatform() (string, error) {
  215. nodes := aws.Clientset.GetAllNodes()
  216. if len(nodes) > 0 {
  217. n := nodes[0]
  218. version := n.Status.NodeInfo.KubeletVersion
  219. if strings.Contains(version, "eks") {
  220. return "eks", nil
  221. }
  222. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  223. return "kops", nil
  224. }
  225. }
  226. return "", nil
  227. }
  228. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  229. c, err := GetDefaultPricingData("aws.json")
  230. if c.Discount == "" {
  231. c.Discount = "0%"
  232. }
  233. if c.NegotiatedDiscount == "" {
  234. c.NegotiatedDiscount = "0%"
  235. }
  236. if err != nil {
  237. return nil, err
  238. }
  239. return c, nil
  240. }
  241. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  242. c, err := GetDefaultPricingData("aws.json")
  243. if err != nil {
  244. return nil, err
  245. }
  246. if updateType == SpotInfoUpdateType {
  247. a := AwsSpotFeedInfo{}
  248. err := json.NewDecoder(r).Decode(&a)
  249. if err != nil {
  250. return nil, err
  251. }
  252. if err != nil {
  253. return nil, err
  254. }
  255. c.ServiceKeyName = a.ServiceKeyName
  256. c.ServiceKeySecret = a.ServiceKeySecret
  257. c.SpotDataPrefix = a.Prefix
  258. c.SpotDataBucket = a.BucketName
  259. c.ProjectID = a.AccountID
  260. c.SpotDataRegion = a.Region
  261. c.SpotLabel = a.SpotLabel
  262. c.SpotLabelValue = a.SpotLabelValue
  263. } else if updateType == AthenaInfoUpdateType {
  264. a := AwsAthenaInfo{}
  265. err := json.NewDecoder(r).Decode(&a)
  266. if err != nil {
  267. return nil, err
  268. }
  269. c.AthenaBucketName = a.AthenaBucketName
  270. c.AthenaRegion = a.AthenaRegion
  271. c.AthenaDatabase = a.AthenaDatabase
  272. c.AthenaTable = a.AthenaTable
  273. c.ServiceKeyName = a.ServiceKeyName
  274. c.ServiceKeySecret = a.ServiceKeySecret
  275. c.ProjectID = a.AccountID
  276. } else {
  277. a := make(map[string]interface{})
  278. err = json.NewDecoder(r).Decode(&a)
  279. if err != nil {
  280. return nil, err
  281. }
  282. for k, v := range a {
  283. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  284. vstr, ok := v.(string)
  285. if ok {
  286. err := SetCustomPricingField(c, kUpper, vstr)
  287. if err != nil {
  288. return nil, err
  289. }
  290. } else {
  291. sci := v.(map[string]interface{})
  292. sc := make(map[string]string)
  293. for k, val := range sci {
  294. sc[k] = val.(string)
  295. }
  296. c.SharedCosts = sc //todo: support reflection/multiple map fields
  297. }
  298. }
  299. }
  300. cj, err := json.Marshal(c)
  301. if err != nil {
  302. return nil, err
  303. }
  304. path := os.Getenv("CONFIG_PATH")
  305. if path == "" {
  306. path = "/models/"
  307. }
  308. path += "aws.json"
  309. remoteEnabled := os.Getenv(remoteEnabled)
  310. if remoteEnabled == "true" {
  311. err = UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
  312. if err != nil {
  313. return nil, err
  314. }
  315. }
  316. err = ioutil.WriteFile(path, cj, 0644)
  317. if err != nil {
  318. return nil, err
  319. }
  320. return c, nil
  321. }
  322. type awsKey struct {
  323. SpotLabelName string
  324. SpotLabelValue string
  325. Labels map[string]string
  326. ProviderID string
  327. }
  328. func (k *awsKey) GPUType() string {
  329. return ""
  330. }
  331. func (k *awsKey) ID() string {
  332. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  333. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  334. if matchNum == 2 {
  335. return group
  336. }
  337. }
  338. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  339. return ""
  340. }
  341. func (k *awsKey) Features() string {
  342. instanceType := k.Labels[v1.LabelInstanceType]
  343. var operatingSystem string
  344. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  345. if !ok {
  346. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  347. }
  348. region := k.Labels[v1.LabelZoneRegion]
  349. key := region + "," + instanceType + "," + operatingSystem
  350. usageType := "preemptible"
  351. spotKey := key + "," + usageType
  352. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  353. return spotKey
  354. }
  355. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  356. return spotKey
  357. }
  358. return key
  359. }
  360. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  361. pricing, ok := aws.Pricing[pvk.Features()]
  362. if !ok {
  363. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  364. return &PV{}, nil
  365. }
  366. return pricing.PV, nil
  367. }
  368. type awsPVKey struct {
  369. Labels map[string]string
  370. StorageClassParameters map[string]string
  371. StorageClassName string
  372. Name string
  373. }
  374. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string) PVKey {
  375. return &awsPVKey{
  376. Labels: pv.Labels,
  377. StorageClassName: pv.Spec.StorageClassName,
  378. StorageClassParameters: parameters,
  379. Name: pv.Name,
  380. }
  381. }
  382. func (key *awsPVKey) GetStorageClass() string {
  383. return key.StorageClassName
  384. }
  385. func (key *awsPVKey) Features() string {
  386. storageClass := key.StorageClassParameters["type"]
  387. if storageClass == "standard" {
  388. storageClass = "gp2"
  389. }
  390. // Storage class names are generally EBS volume types (gp2)
  391. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  392. // Converts between the 2
  393. region := key.Labels[v1.LabelZoneRegion]
  394. //if region == "" {
  395. // region = "us-east-1"
  396. //}
  397. class, ok := volTypes[storageClass]
  398. if !ok {
  399. klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  400. }
  401. return region + "," + class
  402. }
  403. // GetKey maps node labels to information needed to retrieve pricing data
  404. func (aws *AWS) GetKey(labels map[string]string) Key {
  405. return &awsKey{
  406. SpotLabelName: aws.SpotLabelName,
  407. SpotLabelValue: aws.SpotLabelValue,
  408. Labels: labels,
  409. ProviderID: labels["providerID"],
  410. }
  411. }
  412. func (aws *AWS) isPreemptible(key string) bool {
  413. s := strings.Split(key, ",")
  414. if len(s) == 4 && s[3] == "preemptible" {
  415. return true
  416. }
  417. return false
  418. }
  419. // DownloadPricingData fetches data from the AWS Pricing API
  420. func (aws *AWS) DownloadPricingData() error {
  421. aws.DownloadPricingDataLock.Lock()
  422. defer aws.DownloadPricingDataLock.Unlock()
  423. c, err := GetDefaultPricingData("aws.json")
  424. if err != nil {
  425. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  426. }
  427. aws.BaseCPUPrice = c.CPU
  428. aws.BaseRAMPrice = c.RAM
  429. aws.BaseGPUPrice = c.GPU
  430. aws.BaseSpotCPUPrice = c.SpotCPU
  431. aws.BaseSpotRAMPrice = c.SpotRAM
  432. aws.SpotLabelName = c.SpotLabel
  433. aws.SpotLabelValue = c.SpotLabelValue
  434. aws.SpotDataBucket = c.SpotDataBucket
  435. aws.SpotDataPrefix = c.SpotDataPrefix
  436. aws.ProjectID = c.ProjectID
  437. aws.SpotDataRegion = c.SpotDataRegion
  438. aws.ServiceKeyName = c.ServiceKeyName
  439. aws.ServiceKeySecret = c.ServiceKeySecret
  440. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  441. klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  442. }
  443. nodeList := aws.Clientset.GetAllNodes()
  444. inputkeys := make(map[string]bool)
  445. for _, n := range nodeList {
  446. labels := n.GetObjectMeta().GetLabels()
  447. key := aws.GetKey(labels)
  448. inputkeys[key.Features()] = true
  449. }
  450. pvList := aws.Clientset.GetAllPersistentVolumes()
  451. storageClasses := aws.Clientset.GetAllStorageClasses()
  452. storageClassMap := make(map[string]map[string]string)
  453. for _, storageClass := range storageClasses {
  454. params := storageClass.Parameters
  455. storageClassMap[storageClass.ObjectMeta.Name] = params
  456. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  457. storageClassMap["default"] = params
  458. storageClassMap[""] = params
  459. }
  460. }
  461. pvkeys := make(map[string]PVKey)
  462. for _, pv := range pvList {
  463. params, ok := storageClassMap[pv.Spec.StorageClassName]
  464. if !ok {
  465. klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  466. continue
  467. }
  468. key := aws.GetPVKey(pv, params)
  469. pvkeys[key.Features()] = key
  470. }
  471. reserved, err := aws.getReservedInstances()
  472. if err != nil {
  473. klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
  474. } else {
  475. klog.V(1).Infof("Found %d reserved instances", len(reserved))
  476. aws.ReservedInstances = reserved
  477. for _, r := range reserved {
  478. klog.V(1).Infof("%s", r)
  479. }
  480. }
  481. aws.Pricing = make(map[string]*AWSProductTerms)
  482. aws.ValidPricingKeys = make(map[string]bool)
  483. skusToKeys := make(map[string]string)
  484. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  485. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  486. resp, err := http.Get(pricingURL)
  487. if err != nil {
  488. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  489. return err
  490. }
  491. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  492. dec := json.NewDecoder(resp.Body)
  493. for {
  494. t, err := dec.Token()
  495. if err == io.EOF {
  496. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  497. break
  498. }
  499. if t == "products" {
  500. _, err := dec.Token() // this should parse the opening "{""
  501. if err != nil {
  502. return err
  503. }
  504. for dec.More() {
  505. _, err := dec.Token() // the sku token
  506. if err != nil {
  507. return err
  508. }
  509. product := &AWSProduct{}
  510. err = dec.Decode(&product)
  511. if err != nil {
  512. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  513. break
  514. }
  515. if product.Attributes.PreInstalledSw == "NA" &&
  516. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  517. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  518. spotKey := key + ",preemptible"
  519. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  520. productTerms := &AWSProductTerms{
  521. Sku: product.Sku,
  522. Memory: product.Attributes.Memory,
  523. Storage: product.Attributes.Storage,
  524. VCpu: product.Attributes.VCpu,
  525. GPU: product.Attributes.GPU,
  526. }
  527. aws.Pricing[key] = productTerms
  528. aws.Pricing[spotKey] = productTerms
  529. skusToKeys[product.Sku] = key
  530. }
  531. aws.ValidPricingKeys[key] = true
  532. aws.ValidPricingKeys[spotKey] = true
  533. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  534. // UsageTypes may be prefixed with a region code - we're removing this when using
  535. // volTypes to keep lookups generic
  536. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  537. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  538. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  539. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  540. spotKey := key + ",preemptible"
  541. pv := &PV{
  542. Class: volTypes[usageTypeNoRegion],
  543. Region: locationToRegion[product.Attributes.Location],
  544. }
  545. productTerms := &AWSProductTerms{
  546. Sku: product.Sku,
  547. PV: pv,
  548. }
  549. aws.Pricing[key] = productTerms
  550. aws.Pricing[spotKey] = productTerms
  551. skusToKeys[product.Sku] = key
  552. aws.ValidPricingKeys[key] = true
  553. aws.ValidPricingKeys[spotKey] = true
  554. }
  555. }
  556. }
  557. if t == "terms" {
  558. _, err := dec.Token() // this should parse the opening "{""
  559. if err != nil {
  560. return err
  561. }
  562. termType, err := dec.Token()
  563. if err != nil {
  564. return err
  565. }
  566. if termType == "OnDemand" {
  567. _, err := dec.Token()
  568. if err != nil { // again, should parse an opening "{"
  569. return err
  570. }
  571. for dec.More() {
  572. sku, err := dec.Token()
  573. if err != nil {
  574. return err
  575. }
  576. _, err = dec.Token() // another opening "{"
  577. if err != nil {
  578. return err
  579. }
  580. skuOnDemand, err := dec.Token()
  581. if err != nil {
  582. return err
  583. }
  584. offerTerm := &AWSOfferTerm{}
  585. err = dec.Decode(&offerTerm)
  586. if err != nil {
  587. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  588. }
  589. if sku.(string)+OnDemandRateCode == skuOnDemand {
  590. key, ok := skusToKeys[sku.(string)]
  591. spotKey := key + ",preemptible"
  592. if ok {
  593. aws.Pricing[key].OnDemand = offerTerm
  594. aws.Pricing[spotKey].OnDemand = offerTerm
  595. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  596. // If the specific UsageType is the per IO cost used on io1 volumes
  597. // we need to add the per IO cost to the io1 PV cost
  598. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  599. // Add the per IO cost to the PV object for the io1 volume type
  600. aws.Pricing[key].PV.CostPerIO = cost
  601. } else if strings.Contains(key, "EBS:Volume") {
  602. // If volume, we need to get hourly cost and add it to the PV object
  603. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  604. costFloat, _ := strconv.ParseFloat(cost, 64)
  605. hourlyPrice := costFloat / 730
  606. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  607. }
  608. }
  609. }
  610. _, err = dec.Token()
  611. if err != nil {
  612. return err
  613. }
  614. }
  615. _, err = dec.Token()
  616. if err != nil {
  617. return err
  618. }
  619. }
  620. }
  621. }
  622. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  623. if err != nil {
  624. klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
  625. } else {
  626. aws.SpotPricingByInstanceID = sp
  627. }
  628. return nil
  629. }
  630. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  631. func (c *AWS) NetworkPricing() (*Network, error) {
  632. cpricing, err := GetDefaultPricingData("aws.json")
  633. if err != nil {
  634. return nil, err
  635. }
  636. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  637. if err != nil {
  638. return nil, err
  639. }
  640. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  641. if err != nil {
  642. return nil, err
  643. }
  644. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  645. if err != nil {
  646. return nil, err
  647. }
  648. return &Network{
  649. ZoneNetworkEgressCost: znec,
  650. RegionNetworkEgressCost: rnec,
  651. InternetNetworkEgressCost: inec,
  652. }, nil
  653. }
  654. // AllNodePricing returns all the billing data fetched.
  655. func (aws *AWS) AllNodePricing() (interface{}, error) {
  656. aws.DownloadPricingDataLock.RLock()
  657. defer aws.DownloadPricingDataLock.RUnlock()
  658. return aws.Pricing, nil
  659. }
  660. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  661. key := k.Features()
  662. if aws.isPreemptible(key) {
  663. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  664. var spotcost string
  665. arr := strings.Split(spotInfo.Charge, " ")
  666. if len(arr) == 2 {
  667. spotcost = arr[0]
  668. } else {
  669. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  670. }
  671. return &Node{
  672. Cost: spotcost,
  673. VCPU: terms.VCpu,
  674. RAM: terms.Memory,
  675. GPU: terms.GPU,
  676. Storage: terms.Storage,
  677. BaseCPUPrice: aws.BaseCPUPrice,
  678. BaseRAMPrice: aws.BaseRAMPrice,
  679. BaseGPUPrice: aws.BaseGPUPrice,
  680. UsageType: usageType,
  681. }, nil
  682. }
  683. return &Node{
  684. VCPU: terms.VCpu,
  685. VCPUCost: aws.BaseSpotCPUPrice,
  686. RAM: terms.Memory,
  687. GPU: terms.GPU,
  688. RAMCost: aws.BaseSpotRAMPrice,
  689. Storage: terms.Storage,
  690. BaseCPUPrice: aws.BaseCPUPrice,
  691. BaseRAMPrice: aws.BaseRAMPrice,
  692. BaseGPUPrice: aws.BaseGPUPrice,
  693. UsageType: usageType,
  694. }, nil
  695. }
  696. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  697. if !ok {
  698. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  699. }
  700. cost := c.PricePerUnit.USD
  701. return &Node{
  702. Cost: cost,
  703. VCPU: terms.VCpu,
  704. RAM: terms.Memory,
  705. GPU: terms.GPU,
  706. Storage: terms.Storage,
  707. BaseCPUPrice: aws.BaseCPUPrice,
  708. BaseRAMPrice: aws.BaseRAMPrice,
  709. BaseGPUPrice: aws.BaseGPUPrice,
  710. UsageType: usageType,
  711. }, nil
  712. }
  713. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  714. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  715. aws.DownloadPricingDataLock.RLock()
  716. defer aws.DownloadPricingDataLock.RUnlock()
  717. key := k.Features()
  718. usageType := "ondemand"
  719. if aws.isPreemptible(key) {
  720. usageType = "preemptible"
  721. }
  722. terms, ok := aws.Pricing[key]
  723. if ok {
  724. return aws.createNode(terms, usageType, k)
  725. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  726. aws.DownloadPricingDataLock.RUnlock()
  727. err := aws.DownloadPricingData()
  728. aws.DownloadPricingDataLock.RLock()
  729. if err != nil {
  730. return &Node{
  731. Cost: aws.BaseCPUPrice,
  732. BaseCPUPrice: aws.BaseCPUPrice,
  733. BaseRAMPrice: aws.BaseRAMPrice,
  734. BaseGPUPrice: aws.BaseGPUPrice,
  735. UsageType: usageType,
  736. UsesBaseCPUPrice: true,
  737. }, err
  738. }
  739. terms, termsOk := aws.Pricing[key]
  740. if !termsOk {
  741. return &Node{
  742. Cost: aws.BaseCPUPrice,
  743. BaseCPUPrice: aws.BaseCPUPrice,
  744. BaseRAMPrice: aws.BaseRAMPrice,
  745. BaseGPUPrice: aws.BaseGPUPrice,
  746. UsageType: usageType,
  747. UsesBaseCPUPrice: true,
  748. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  749. }
  750. return aws.createNode(terms, usageType, k)
  751. } else { // Fall back to base pricing if we can't find the key.
  752. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  753. return &Node{
  754. Cost: aws.BaseCPUPrice,
  755. BaseCPUPrice: aws.BaseCPUPrice,
  756. BaseRAMPrice: aws.BaseRAMPrice,
  757. BaseGPUPrice: aws.BaseGPUPrice,
  758. UsageType: usageType,
  759. UsesBaseCPUPrice: true,
  760. }, nil
  761. }
  762. }
  763. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  764. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  765. defaultClusterName := "AWS Cluster #1"
  766. c, err := awsProvider.GetConfig()
  767. if err != nil {
  768. return nil, err
  769. }
  770. remote := os.Getenv(remoteEnabled)
  771. remoteEnabled := false
  772. if os.Getenv(remote) == "true" {
  773. remoteEnabled = true
  774. }
  775. if c.ClusterName != "" {
  776. m := make(map[string]string)
  777. m["name"] = c.ClusterName
  778. m["provider"] = "AWS"
  779. m["id"] = os.Getenv(clusterIDKey)
  780. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  781. return m, nil
  782. }
  783. makeStructure := func(clusterName string) (map[string]string, error) {
  784. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  785. m := make(map[string]string)
  786. m["name"] = clusterName
  787. m["provider"] = "AWS"
  788. m["id"] = os.Getenv(clusterIDKey)
  789. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  790. return m, nil
  791. }
  792. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  793. if len(maybeClusterId) != 0 {
  794. return makeStructure(maybeClusterId)
  795. }
  796. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  797. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  798. nodeList := awsProvider.Clientset.GetAllNodes()
  799. for _, n := range nodeList {
  800. region := ""
  801. instanceId := ""
  802. providerId := n.Spec.ProviderID
  803. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  804. if matchNum == 1 {
  805. region = group
  806. } else if matchNum == 2 {
  807. instanceId = group
  808. }
  809. }
  810. if len(instanceId) == 0 {
  811. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  812. continue
  813. }
  814. c := &aws.Config{
  815. Region: aws.String(region),
  816. }
  817. s := session.Must(session.NewSession(c))
  818. ec2Svc := ec2.New(s)
  819. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  820. InstanceIds: []*string{
  821. aws.String(instanceId),
  822. },
  823. })
  824. if diErr != nil {
  825. // maybe log this?
  826. continue
  827. }
  828. if len(di.Reservations) != 1 {
  829. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  830. continue
  831. }
  832. res := di.Reservations[0]
  833. if len(res.Instances) != 1 {
  834. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  835. continue
  836. }
  837. inst := res.Instances[0]
  838. for _, tag := range inst.Tags {
  839. tagKey := *tag.Key
  840. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  841. if matchNum != 1 {
  842. continue
  843. }
  844. return makeStructure(group)
  845. }
  846. }
  847. }
  848. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  849. return makeStructure(defaultClusterName)
  850. }
  851. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  852. func (*AWS) AddServiceKey(formValues url.Values) error {
  853. keyID := formValues.Get("access_key_ID")
  854. key := formValues.Get("secret_access_key")
  855. m := make(map[string]string)
  856. m["access_key_ID"] = keyID
  857. m["secret_access_key"] = key
  858. result, err := json.Marshal(m)
  859. if err != nil {
  860. return err
  861. }
  862. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  863. }
  864. func configureAWSAuth(keyFile string) error {
  865. jsonFile, err := os.Open(keyFile)
  866. if err != nil {
  867. if os.IsNotExist(err) {
  868. klog.V(2).Infof("Using Default Credentials")
  869. return nil
  870. }
  871. return err
  872. }
  873. defer jsonFile.Close()
  874. byteValue, _ := ioutil.ReadAll(jsonFile)
  875. var result map[string]string
  876. err = json.Unmarshal([]byte(byteValue), &result)
  877. if err != nil {
  878. return err
  879. }
  880. err = os.Setenv(awsAccessKeyIDEnvVar, result["awsServiceKeyName"])
  881. if err != nil {
  882. return err
  883. }
  884. err = os.Setenv(awsAccessKeySecretEnvVar, result["awsServiceKeySecret"])
  885. if err != nil {
  886. return err
  887. }
  888. return nil
  889. }
  890. func getClusterConfig(ccFile string) (map[string]string, error) {
  891. clusterConfig, err := os.Open(ccFile)
  892. if err != nil {
  893. return nil, err
  894. }
  895. defer clusterConfig.Close()
  896. b, err := ioutil.ReadAll(clusterConfig)
  897. if err != nil {
  898. return nil, err
  899. }
  900. var clusterConf map[string]string
  901. err = json.Unmarshal([]byte(b), &clusterConf)
  902. if err != nil {
  903. return nil, err
  904. }
  905. return clusterConf, nil
  906. }
  907. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  908. func (*AWS) GetDisks() ([]byte, error) {
  909. err := configureAWSAuth("/var/configs/key.json")
  910. if err != nil {
  911. return nil, err
  912. }
  913. clusterConfig, err := getClusterConfig("/var/configs/cluster.json")
  914. if err != nil {
  915. return nil, err
  916. }
  917. region := aws.String(clusterConfig["region"])
  918. c := &aws.Config{
  919. Region: region,
  920. }
  921. s := session.Must(session.NewSession(c))
  922. ec2Svc := ec2.New(s)
  923. input := &ec2.DescribeVolumesInput{}
  924. volumeResult, err := ec2Svc.DescribeVolumes(input)
  925. if err != nil {
  926. if aerr, ok := err.(awserr.Error); ok {
  927. switch aerr.Code() {
  928. default:
  929. return nil, aerr
  930. }
  931. } else {
  932. return nil, err
  933. }
  934. }
  935. return json.Marshal(volumeResult)
  936. }
  937. // ConvertToGlueColumnFormat takes a string and runs through various regex
  938. // and string replacement statements to convert it to a format compatible
  939. // with AWS Glue and Athena column names.
  940. // Following guidance from AWS provided here ('Column Names' section):
  941. // https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/run-athena-sql.html
  942. // It returns a string containing the column name in proper column name format and length.
  943. func ConvertToGlueColumnFormat(column_name string) string {
  944. klog.V(5).Infof("Converting string \"%s\" to proper AWS Glue column name.", column_name)
  945. // An underscore is added in front of uppercase letters
  946. capital_underscore := regexp.MustCompile(`[A-Z]`)
  947. final := capital_underscore.ReplaceAllString(column_name, `_$0`)
  948. // Any non-alphanumeric characters are replaced with an underscore
  949. no_space_punc := regexp.MustCompile(`[\s]{1,}|[^A-Za-z0-9]`)
  950. final = no_space_punc.ReplaceAllString(final, "_")
  951. // Duplicate underscores are removed
  952. no_dup_underscore := regexp.MustCompile(`_{2,}`)
  953. final = no_dup_underscore.ReplaceAllString(final, "_")
  954. // Any leading and trailing underscores are removed
  955. no_front_end_underscore := regexp.MustCompile(`(^\_|\_$)`)
  956. final = no_front_end_underscore.ReplaceAllString(final, "")
  957. // Uppercase to lowercase
  958. final = strings.ToLower(final)
  959. // Longer column name than expected - remove _ left to right
  960. allowed_col_len := 128
  961. undersc_to_remove := len(final) - allowed_col_len
  962. if undersc_to_remove > 0 {
  963. final = strings.Replace(final, "_", "", undersc_to_remove)
  964. }
  965. // If removing all of the underscores still didn't
  966. // make the column name < 128 characters, trim it!
  967. if len(final) > allowed_col_len {
  968. final = final[:allowed_col_len]
  969. }
  970. klog.V(5).Infof("Column name being returned: \"%s\". Length: \"%d\".", final, len(final))
  971. return final
  972. }
  973. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  974. // "start" and "end" are dates of the format YYYY-MM-DD
  975. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  976. func (a *AWS) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
  977. customPricing, err := a.GetConfig()
  978. if err != nil {
  979. return nil, err
  980. }
  981. aggregator_column_name := "resource_tags_user_" + aggregator
  982. aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
  983. query := fmt.Sprintf(`SELECT
  984. CAST(line_item_usage_start_date AS DATE) as start_date,
  985. %s,
  986. line_item_product_code,
  987. SUM(line_item_blended_cost) as blended_cost
  988. FROM %s as cost_data
  989. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  990. GROUP BY 1,2,3`, aggregator_column_name, customPricing.AthenaTable, start, end)
  991. if customPricing.ServiceKeyName != "" {
  992. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  993. if err != nil {
  994. return nil, err
  995. }
  996. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  997. if err != nil {
  998. return nil, err
  999. }
  1000. }
  1001. region := aws.String(customPricing.AthenaRegion)
  1002. resultsBucket := customPricing.AthenaBucketName
  1003. database := customPricing.AthenaDatabase
  1004. c := &aws.Config{
  1005. Region: region,
  1006. }
  1007. s := session.Must(session.NewSession(c))
  1008. svc := athena.New(s)
  1009. var e athena.StartQueryExecutionInput
  1010. var r athena.ResultConfiguration
  1011. r.SetOutputLocation(resultsBucket)
  1012. e.SetResultConfiguration(&r)
  1013. e.SetQueryString(query)
  1014. var q athena.QueryExecutionContext
  1015. q.SetDatabase(database)
  1016. e.SetQueryExecutionContext(&q)
  1017. res, err := svc.StartQueryExecution(&e)
  1018. if err != nil {
  1019. return nil, err
  1020. }
  1021. klog.V(2).Infof("StartQueryExecution result:")
  1022. klog.V(2).Infof(res.GoString())
  1023. var qri athena.GetQueryExecutionInput
  1024. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1025. var qrop *athena.GetQueryExecutionOutput
  1026. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1027. for {
  1028. qrop, err = svc.GetQueryExecution(&qri)
  1029. if err != nil {
  1030. return nil, err
  1031. }
  1032. if *qrop.QueryExecution.Status.State != "RUNNING" {
  1033. break
  1034. }
  1035. time.Sleep(duration)
  1036. }
  1037. var oocAllocs []*OutOfClusterAllocation
  1038. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1039. var ip athena.GetQueryResultsInput
  1040. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1041. op, err := svc.GetQueryResults(&ip)
  1042. if err != nil {
  1043. return nil, err
  1044. }
  1045. if len(op.ResultSet.Rows) > 1 {
  1046. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1047. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1048. if err != nil {
  1049. return nil, err
  1050. }
  1051. ooc := &OutOfClusterAllocation{
  1052. Aggregator: aggregator,
  1053. Environment: *r.Data[1].VarCharValue,
  1054. Service: *r.Data[2].VarCharValue,
  1055. Cost: cost,
  1056. }
  1057. oocAllocs = append(oocAllocs, ooc)
  1058. }
  1059. } else {
  1060. klog.V(1).Infof("No results available for %s at database %s between %s and %s", aggregator_column_name, customPricing.AthenaTable, start, end)
  1061. }
  1062. }
  1063. return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
  1064. }
  1065. // QuerySQL can query a properly configured Athena database.
  1066. // Used to fetch billing data.
  1067. // Requires a json config in /var/configs with key region, output, and database.
  1068. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  1069. customPricing, err := a.GetConfig()
  1070. if err != nil {
  1071. return nil, err
  1072. }
  1073. if customPricing.ServiceKeyName != "" {
  1074. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1075. if err != nil {
  1076. return nil, err
  1077. }
  1078. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1079. if err != nil {
  1080. return nil, err
  1081. }
  1082. }
  1083. athenaConfigs, err := os.Open("/var/configs/athena.json")
  1084. if err != nil {
  1085. return nil, err
  1086. }
  1087. defer athenaConfigs.Close()
  1088. b, err := ioutil.ReadAll(athenaConfigs)
  1089. if err != nil {
  1090. return nil, err
  1091. }
  1092. var athenaConf map[string]string
  1093. json.Unmarshal([]byte(b), &athenaConf)
  1094. region := aws.String(customPricing.AthenaRegion)
  1095. resultsBucket := customPricing.AthenaBucketName
  1096. database := customPricing.AthenaDatabase
  1097. c := &aws.Config{
  1098. Region: region,
  1099. }
  1100. s := session.Must(session.NewSession(c))
  1101. svc := athena.New(s)
  1102. var e athena.StartQueryExecutionInput
  1103. var r athena.ResultConfiguration
  1104. r.SetOutputLocation(resultsBucket)
  1105. e.SetResultConfiguration(&r)
  1106. e.SetQueryString(query)
  1107. var q athena.QueryExecutionContext
  1108. q.SetDatabase(database)
  1109. e.SetQueryExecutionContext(&q)
  1110. res, err := svc.StartQueryExecution(&e)
  1111. if err != nil {
  1112. return nil, err
  1113. }
  1114. klog.V(2).Infof("StartQueryExecution result:")
  1115. klog.V(2).Infof(res.GoString())
  1116. var qri athena.GetQueryExecutionInput
  1117. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1118. var qrop *athena.GetQueryExecutionOutput
  1119. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1120. for {
  1121. qrop, err = svc.GetQueryExecution(&qri)
  1122. if err != nil {
  1123. return nil, err
  1124. }
  1125. if *qrop.QueryExecution.Status.State != "RUNNING" {
  1126. break
  1127. }
  1128. time.Sleep(duration)
  1129. }
  1130. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1131. var ip athena.GetQueryResultsInput
  1132. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1133. op, err := svc.GetQueryResults(&ip)
  1134. if err != nil {
  1135. return nil, err
  1136. }
  1137. b, err := json.Marshal(op.ResultSet)
  1138. if err != nil {
  1139. return nil, err
  1140. }
  1141. return b, nil
  1142. }
  1143. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  1144. }
  1145. type spotInfo struct {
  1146. Timestamp string `csv:"Timestamp"`
  1147. UsageType string `csv:"UsageType"`
  1148. Operation string `csv:"Operation"`
  1149. InstanceID string `csv:"InstanceID"`
  1150. MyBidID string `csv:"MyBidID"`
  1151. MyMaxPrice string `csv:"MyMaxPrice"`
  1152. MarketPrice string `csv:"MarketPrice"`
  1153. Charge string `csv:"Charge"`
  1154. Version string `csv:"Version"`
  1155. }
  1156. type fnames []*string
  1157. func (f fnames) Len() int {
  1158. return len(f)
  1159. }
  1160. func (f fnames) Swap(i, j int) {
  1161. f[i], f[j] = f[j], f[i]
  1162. }
  1163. func (f fnames) Less(i, j int) bool {
  1164. key1 := strings.Split(*f[i], ".")
  1165. key2 := strings.Split(*f[j], ".")
  1166. t1, err := time.Parse("2006-01-02-15", key1[1])
  1167. if err != nil {
  1168. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  1169. return false
  1170. }
  1171. t2, err := time.Parse("2006-01-02-15", key2[1])
  1172. if err != nil {
  1173. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  1174. return false
  1175. }
  1176. return t1.Before(t2)
  1177. }
  1178. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  1179. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1180. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  1181. if err != nil {
  1182. return nil, err
  1183. }
  1184. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  1185. if err != nil {
  1186. return nil, err
  1187. }
  1188. }
  1189. s3Prefix := projectID
  1190. if len(prefix) != 0 {
  1191. s3Prefix = prefix + "/" + s3Prefix
  1192. }
  1193. c := aws.NewConfig().WithRegion(region)
  1194. s := session.Must(session.NewSession(c))
  1195. s3Svc := s3.New(s)
  1196. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  1197. tNow := time.Now()
  1198. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1199. ls := &s3.ListObjectsInput{
  1200. Bucket: aws.String(bucket),
  1201. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1202. }
  1203. ls2 := &s3.ListObjectsInput{
  1204. Bucket: aws.String(bucket),
  1205. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1206. }
  1207. lso, err := s3Svc.ListObjects(ls)
  1208. if err != nil {
  1209. return nil, err
  1210. }
  1211. lsoLen := len(lso.Contents)
  1212. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1213. if lsoLen == 0 {
  1214. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1215. }
  1216. lso2, err := s3Svc.ListObjects(ls2)
  1217. if err != nil {
  1218. return nil, err
  1219. }
  1220. lso2Len := len(lso2.Contents)
  1221. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1222. if lso2Len == 0 {
  1223. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1224. }
  1225. var keys []*string
  1226. for _, obj := range lso.Contents {
  1227. keys = append(keys, obj.Key)
  1228. }
  1229. for _, obj := range lso2.Contents {
  1230. keys = append(keys, obj.Key)
  1231. }
  1232. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1233. header, err := csvutil.Header(spotInfo{}, "csv")
  1234. if err != nil {
  1235. return nil, err
  1236. }
  1237. fieldsPerRecord := len(header)
  1238. spots := make(map[string]*spotInfo)
  1239. for _, key := range keys {
  1240. getObj := &s3.GetObjectInput{
  1241. Bucket: aws.String(bucket),
  1242. Key: key,
  1243. }
  1244. buf := aws.NewWriteAtBuffer([]byte{})
  1245. _, err := downloader.Download(buf, getObj)
  1246. if err != nil {
  1247. return nil, err
  1248. }
  1249. r := bytes.NewReader(buf.Bytes())
  1250. gr, err := gzip.NewReader(r)
  1251. if err != nil {
  1252. return nil, err
  1253. }
  1254. csvReader := csv.NewReader(gr)
  1255. csvReader.Comma = '\t'
  1256. csvReader.FieldsPerRecord = fieldsPerRecord
  1257. dec, err := csvutil.NewDecoder(csvReader, header...)
  1258. if err != nil {
  1259. return nil, err
  1260. }
  1261. var foundVersion string
  1262. for {
  1263. spot := spotInfo{}
  1264. err := dec.Decode(&spot)
  1265. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1266. if err == io.EOF {
  1267. break
  1268. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1269. rec := dec.Record()
  1270. // the first two "Record()" will be the comment lines
  1271. // and they show up as len() == 1
  1272. // the first of which is "#Version"
  1273. // the second of which is "#Fields: "
  1274. if len(rec) != 1 {
  1275. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1276. continue
  1277. }
  1278. if len(foundVersion) == 0 {
  1279. spotFeedVersion := rec[0]
  1280. klog.V(4).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  1281. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1282. if matches != nil {
  1283. foundVersion = matches[1]
  1284. if foundVersion != supportedSpotFeedVersion {
  1285. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1286. break
  1287. }
  1288. }
  1289. continue
  1290. } else if strings.Index(rec[0], "#") == 0 {
  1291. continue
  1292. } else {
  1293. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  1294. continue
  1295. }
  1296. } else if err != nil {
  1297. klog.V(2).Infof("Error during spot info decode: %+v", err)
  1298. continue
  1299. }
  1300. klog.V(4).Infof("Found spot info %+v", spot)
  1301. spots[spot.InstanceID] = &spot
  1302. }
  1303. gr.Close()
  1304. }
  1305. return spots, nil
  1306. }
  1307. func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  1308. numReserved := len(a.ReservedInstances)
  1309. // Early return if no reserved instance data loaded
  1310. if numReserved == 0 {
  1311. klog.V(1).Infof("[Reserved] No Reserved Instances")
  1312. return
  1313. }
  1314. cfg, err := a.GetConfig()
  1315. defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
  1316. if err != nil {
  1317. klog.V(3).Infof("Could not parse default cpu price")
  1318. defaultCPU = 0.031611
  1319. }
  1320. defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
  1321. if err != nil {
  1322. klog.V(3).Infof("Could not parse default ram price")
  1323. defaultRAM = 0.004237
  1324. }
  1325. cpuToRAMRatio := defaultCPU / defaultRAM
  1326. now := time.Now()
  1327. instances := make(map[string][]*AWSReservedInstance)
  1328. for _, r := range a.ReservedInstances {
  1329. if now.Before(r.StartDate) || now.After(r.EndDate) {
  1330. klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
  1331. continue
  1332. }
  1333. _, ok := instances[r.Region]
  1334. if !ok {
  1335. instances[r.Region] = []*AWSReservedInstance{r}
  1336. } else {
  1337. instances[r.Region] = append(instances[r.Region], r)
  1338. }
  1339. }
  1340. awsNodes := make(map[string]*v1.Node)
  1341. currentNodes := a.Clientset.GetAllNodes()
  1342. // Create a node name -> node map
  1343. for _, awsNode := range currentNodes {
  1344. awsNodes[awsNode.GetName()] = awsNode
  1345. }
  1346. // go through all provider nodes using k8s nodes for region
  1347. for nodeName, node := range nodes {
  1348. // Reset reserved allocation to prevent double allocation
  1349. node.Reserved = nil
  1350. kNode, ok := awsNodes[nodeName]
  1351. if !ok {
  1352. klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
  1353. continue
  1354. }
  1355. nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
  1356. if !ok {
  1357. klog.V(1).Infof("[Reserved] Could not find node region")
  1358. continue
  1359. }
  1360. reservedInstances, ok := instances[nodeRegion]
  1361. if !ok {
  1362. klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
  1363. continue
  1364. }
  1365. // Determine the InstanceType of the node
  1366. instanceType, ok := kNode.Labels["beta.kubernetes.io/instance-type"]
  1367. if !ok {
  1368. continue
  1369. }
  1370. ramBytes, err := strconv.ParseFloat(node.RAMBytes, 64)
  1371. if err != nil {
  1372. continue
  1373. }
  1374. ramGB := ramBytes / 1024 / 1024 / 1024
  1375. cpu, err := strconv.ParseFloat(node.VCPU, 64)
  1376. if err != nil {
  1377. continue
  1378. }
  1379. ramMultiple := cpu*cpuToRAMRatio + ramGB
  1380. node.Reserved = &ReservedInstanceData{
  1381. ReservedCPU: 0,
  1382. ReservedRAM: 0,
  1383. }
  1384. for i, reservedInstance := range reservedInstances {
  1385. if reservedInstance.InstanceType == instanceType {
  1386. // Use < 0 to mark as ALL
  1387. node.Reserved.ReservedCPU = -1
  1388. node.Reserved.ReservedRAM = -1
  1389. // Set Costs based on CPU/RAM ratios
  1390. ramPrice := reservedInstance.PricePerHour / ramMultiple
  1391. node.Reserved.CPUCost = ramPrice * cpuToRAMRatio
  1392. node.Reserved.RAMCost = ramPrice
  1393. // Remove the reserve from the temporary slice to prevent
  1394. // being reallocated
  1395. instances[nodeRegion] = append(reservedInstances[:i], reservedInstances[i+1:]...)
  1396. break
  1397. }
  1398. }
  1399. }
  1400. }
  1401. type AWSReservedInstance struct {
  1402. Zone string
  1403. Region string
  1404. InstanceType string
  1405. InstanceCount int64
  1406. InstanceTenacy string
  1407. StartDate time.Time
  1408. EndDate time.Time
  1409. PricePerHour float64
  1410. }
  1411. func (ari *AWSReservedInstance) String() string {
  1412. return fmt.Sprintf("[Zone: %s, Region: %s, Type: %s, Count: %d, Tenacy: %s, Start: %+v, End: %+v, Price: %f]", ari.Zone, ari.Region, ari.InstanceType, ari.InstanceCount, ari.InstanceTenacy, ari.StartDate, ari.EndDate, ari.PricePerHour)
  1413. }
  1414. func isReservedInstanceHourlyPrice(rc *ec2.RecurringCharge) bool {
  1415. return rc != nil && rc.Frequency != nil && *rc.Frequency == "Hourly"
  1416. }
  1417. func getReservedInstancePrice(ri *ec2.ReservedInstances) (float64, error) {
  1418. var pricePerHour float64
  1419. if len(ri.RecurringCharges) > 0 {
  1420. for _, rc := range ri.RecurringCharges {
  1421. if isReservedInstanceHourlyPrice(rc) {
  1422. pricePerHour = *rc.Amount
  1423. break
  1424. }
  1425. }
  1426. }
  1427. // If we're still unable to resolve hourly price, try fixed -> hourly
  1428. if pricePerHour == 0 {
  1429. if ri.Duration != nil && ri.FixedPrice != nil {
  1430. var durHours float64
  1431. durSeconds := float64(*ri.Duration)
  1432. fixedPrice := float64(*ri.FixedPrice)
  1433. if durSeconds != 0 && fixedPrice != 0 {
  1434. durHours = durSeconds / 60 / 60
  1435. pricePerHour = fixedPrice / durHours
  1436. }
  1437. }
  1438. }
  1439. if pricePerHour == 0 {
  1440. return 0, fmt.Errorf("Failed to resolve an hourly price from FixedPrice or Recurring Costs")
  1441. }
  1442. return pricePerHour, nil
  1443. }
  1444. func getRegionReservedInstances(region string) ([]*AWSReservedInstance, error) {
  1445. c := &aws.Config{
  1446. Region: aws.String(region),
  1447. }
  1448. s := session.Must(session.NewSession(c))
  1449. svc := ec2.New(s)
  1450. response, err := svc.DescribeReservedInstances(&ec2.DescribeReservedInstancesInput{})
  1451. if err != nil {
  1452. return nil, err
  1453. }
  1454. var reservedInstances []*AWSReservedInstance
  1455. for _, ri := range response.ReservedInstances {
  1456. var zone string
  1457. if ri.AvailabilityZone != nil {
  1458. zone = *ri.AvailabilityZone
  1459. }
  1460. pricePerHour, err := getReservedInstancePrice(ri)
  1461. if err != nil {
  1462. klog.V(1).Infof("Error Resolving Price: %s", err.Error())
  1463. continue
  1464. }
  1465. reservedInstances = append(reservedInstances, &AWSReservedInstance{
  1466. Zone: zone,
  1467. Region: region,
  1468. InstanceType: *ri.InstanceType,
  1469. InstanceCount: *ri.InstanceCount,
  1470. InstanceTenacy: *ri.InstanceTenancy,
  1471. StartDate: *ri.Start,
  1472. EndDate: *ri.End,
  1473. PricePerHour: pricePerHour,
  1474. })
  1475. }
  1476. return reservedInstances, nil
  1477. }
  1478. func (a *AWS) getReservedInstances() ([]*AWSReservedInstance, error) {
  1479. err := configureAWSAuth("/var/configs/aws.json")
  1480. if err != nil {
  1481. return nil, err
  1482. }
  1483. var reservedInstances []*AWSReservedInstance
  1484. nodes := a.Clientset.GetAllNodes()
  1485. regionsSeen := make(map[string]bool)
  1486. for _, node := range nodes {
  1487. region, ok := node.Labels[v1.LabelZoneRegion]
  1488. if !ok {
  1489. continue
  1490. }
  1491. if regionsSeen[region] {
  1492. continue
  1493. }
  1494. ris, err := getRegionReservedInstances(region)
  1495. if err != nil {
  1496. klog.V(3).Infof("Error getting reserved instances: %s", err.Error())
  1497. continue
  1498. }
  1499. regionsSeen[region] = true
  1500. reservedInstances = append(reservedInstances, ris...)
  1501. }
  1502. return reservedInstances, nil
  1503. }