awsprovider.go 49 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. "k8s.io/klog"
  19. "github.com/kubecost/cost-model/clustercache"
  20. "github.com/aws/aws-sdk-go/aws"
  21. "github.com/aws/aws-sdk-go/aws/awserr"
  22. "github.com/aws/aws-sdk-go/aws/session"
  23. "github.com/aws/aws-sdk-go/service/athena"
  24. "github.com/aws/aws-sdk-go/service/ec2"
  25. "github.com/aws/aws-sdk-go/service/s3"
  26. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  27. "github.com/jszwec/csvutil"
  28. v1 "k8s.io/api/core/v1"
  29. )
  30. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  31. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  32. const awsReservedInstancePricePerHour = 0.0287
  33. const supportedSpotFeedVersion = "1"
  34. const SpotInfoUpdateType = "spotinfo"
  35. const AthenaInfoUpdateType = "athenainfo"
  36. // AWS represents an Amazon Provider
  37. type AWS struct {
  38. Pricing map[string]*AWSProductTerms
  39. SpotPricingByInstanceID map[string]*spotInfo
  40. ValidPricingKeys map[string]bool
  41. Clientset clustercache.ClusterCache
  42. BaseCPUPrice string
  43. BaseRAMPrice string
  44. BaseGPUPrice string
  45. BaseSpotCPUPrice string
  46. BaseSpotRAMPrice string
  47. SpotLabelName string
  48. SpotLabelValue string
  49. ServiceKeyName string
  50. ServiceKeySecret string
  51. SpotDataRegion string
  52. SpotDataBucket string
  53. SpotDataPrefix string
  54. ProjectID string
  55. DownloadPricingDataLock sync.RWMutex
  56. ReservedInstances []*AWSReservedInstance
  57. *CustomProvider
  58. }
  59. // AWSPricing maps a k8s node to an AWS Pricing "product"
  60. type AWSPricing struct {
  61. Products map[string]*AWSProduct `json:"products"`
  62. Terms AWSPricingTerms `json:"terms"`
  63. }
  64. // AWSProduct represents a purchased SKU
  65. type AWSProduct struct {
  66. Sku string `json:"sku"`
  67. Attributes AWSProductAttributes `json:"attributes"`
  68. }
  69. // AWSProductAttributes represents metadata about the product used to map to a node.
  70. type AWSProductAttributes struct {
  71. Location string `json:"location"`
  72. InstanceType string `json:"instanceType"`
  73. Memory string `json:"memory"`
  74. Storage string `json:"storage"`
  75. VCpu string `json:"vcpu"`
  76. UsageType string `json:"usagetype"`
  77. OperatingSystem string `json:"operatingSystem"`
  78. PreInstalledSw string `json:"preInstalledSw"`
  79. InstanceFamily string `json:"instanceFamily"`
  80. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  81. }
  82. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  83. type AWSPricingTerms struct {
  84. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  85. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  86. }
  87. // AWSOfferTerm is a sku extension used to pay for the node.
  88. type AWSOfferTerm struct {
  89. Sku string `json:"sku"`
  90. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  91. }
  92. // AWSRateCode encodes data about the price of a product
  93. type AWSRateCode struct {
  94. Unit string `json:"unit"`
  95. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  96. }
  97. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  98. type AWSCurrencyCode struct {
  99. USD string `json:"USD"`
  100. }
  101. // AWSProductTerms represents the full terms of the product
  102. type AWSProductTerms struct {
  103. Sku string `json:"sku"`
  104. OnDemand *AWSOfferTerm `json:"OnDemand"`
  105. Reserved *AWSOfferTerm `json:"Reserved"`
  106. Memory string `json:"memory"`
  107. Storage string `json:"storage"`
  108. VCpu string `json:"vcpu"`
  109. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  110. PV *PV `json:"pv"`
  111. }
  112. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  113. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  114. // OnDemandRateCode is appended to an node sku
  115. const OnDemandRateCode = ".JRTCKXETXF"
  116. // ReservedRateCode is appended to a node sku
  117. const ReservedRateCode = ".38NPMPTW36"
  118. // HourlyRateCode is appended to a node sku
  119. const HourlyRateCode = ".6YS6EN2CT7"
  120. // volTypes are used to map between AWS UsageTypes and
  121. // EBS volume types, as they would appear in K8s storage class
  122. // name and the EC2 API.
  123. var volTypes = map[string]string{
  124. "EBS:VolumeUsage.gp2": "gp2",
  125. "EBS:VolumeUsage": "standard",
  126. "EBS:VolumeUsage.sc1": "sc1",
  127. "EBS:VolumeP-IOPS.piops": "io1",
  128. "EBS:VolumeUsage.st1": "st1",
  129. "EBS:VolumeUsage.piops": "io1",
  130. "gp2": "EBS:VolumeUsage.gp2",
  131. "standard": "EBS:VolumeUsage",
  132. "sc1": "EBS:VolumeUsage.sc1",
  133. "io1": "EBS:VolumeUsage.piops",
  134. "st1": "EBS:VolumeUsage.st1",
  135. }
  136. // locationToRegion maps AWS region names (As they come from Billing)
  137. // to actual region identifiers
  138. var locationToRegion = map[string]string{
  139. "US East (Ohio)": "us-east-2",
  140. "US East (N. Virginia)": "us-east-1",
  141. "US West (N. California)": "us-west-1",
  142. "US West (Oregon)": "us-west-2",
  143. "Asia Pacific (Hong Kong)": "ap-east-1",
  144. "Asia Pacific (Mumbai)": "ap-south-1",
  145. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  146. "Asia Pacific (Seoul)": "ap-northeast-2",
  147. "Asia Pacific (Singapore)": "ap-southeast-1",
  148. "Asia Pacific (Sydney)": "ap-southeast-2",
  149. "Asia Pacific (Tokyo)": "ap-northeast-1",
  150. "Canada (Central)": "ca-central-1",
  151. "China (Beijing)": "cn-north-1",
  152. "China (Ningxia)": "cn-northwest-1",
  153. "EU (Frankfurt)": "eu-central-1",
  154. "EU (Ireland)": "eu-west-1",
  155. "EU (London)": "eu-west-2",
  156. "EU (Paris)": "eu-west-3",
  157. "EU (Stockholm)": "eu-north-1",
  158. "South America (Sao Paulo)": "sa-east-1",
  159. "AWS GovCloud (US-East)": "us-gov-east-1",
  160. "AWS GovCloud (US)": "us-gov-west-1",
  161. }
  162. var regionToBillingRegionCode = map[string]string{
  163. "us-east-2": "USE2",
  164. "us-east-1": "",
  165. "us-west-1": "USW1",
  166. "us-west-2": "USW2",
  167. "ap-east-1": "APE1",
  168. "ap-south-1": "APS3",
  169. "ap-northeast-3": "APN3",
  170. "ap-northeast-2": "APN2",
  171. "ap-southeast-1": "APS1",
  172. "ap-southeast-2": "APS2",
  173. "ap-northeast-1": "APN1",
  174. "ca-central-1": "CAN1",
  175. "cn-north-1": "",
  176. "cn-northwest-1": "",
  177. "eu-central-1": "EUC1",
  178. "eu-west-1": "EU",
  179. "eu-west-2": "EUW2",
  180. "eu-west-3": "EUW3",
  181. "eu-north-1": "EUN1",
  182. "sa-east-1": "SAE1",
  183. "us-gov-east-1": "UGE1",
  184. "us-gov-west-1": "UGW1",
  185. }
  186. func (aws *AWS) GetLocalStorageQuery(offset string) (string, error) {
  187. return "", nil
  188. }
  189. // KubeAttrConversion maps the k8s labels for region to an aws region
  190. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  191. operatingSystem = strings.ToLower(operatingSystem)
  192. region := locationToRegion[location]
  193. return region + "," + instanceType + "," + operatingSystem
  194. }
  195. type AwsSpotFeedInfo struct {
  196. BucketName string `json:"bucketName"`
  197. Prefix string `json:"prefix"`
  198. Region string `json:"region"`
  199. AccountID string `json:"projectID"`
  200. ServiceKeyName string `json:"serviceKeyName"`
  201. ServiceKeySecret string `json:"serviceKeySecret"`
  202. SpotLabel string `json:"spotLabel"`
  203. SpotLabelValue string `json:"spotLabelValue"`
  204. }
  205. type AwsAthenaInfo struct {
  206. AthenaBucketName string `json:"athenaBucketName"`
  207. AthenaRegion string `json:"athenaRegion"`
  208. AthenaDatabase string `json:"athenaDatabase"`
  209. AthenaTable string `json:"athenaTable"`
  210. ServiceKeyName string `json:"serviceKeyName"`
  211. ServiceKeySecret string `json:"serviceKeySecret"`
  212. AccountID string `json:"projectID"`
  213. }
  214. func (aws *AWS) GetManagementPlatform() (string, error) {
  215. nodes := aws.Clientset.GetAllNodes()
  216. if len(nodes) > 0 {
  217. n := nodes[0]
  218. version := n.Status.NodeInfo.KubeletVersion
  219. if strings.Contains(version, "eks") {
  220. return "eks", nil
  221. }
  222. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  223. return "kops", nil
  224. }
  225. }
  226. return "", nil
  227. }
  228. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  229. c, err := GetDefaultPricingData("aws.json")
  230. if c.Discount == "" {
  231. c.Discount = "0%"
  232. }
  233. if c.NegotiatedDiscount == "" {
  234. c.NegotiatedDiscount = "0%"
  235. }
  236. if err != nil {
  237. return nil, err
  238. }
  239. return c, nil
  240. }
  241. func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
  242. c, err := GetDefaultPricingData("aws.json")
  243. if err != nil {
  244. return nil, err
  245. }
  246. path := os.Getenv("CONFIG_PATH")
  247. if path == "" {
  248. path = "/models/"
  249. }
  250. configPath := path + "aws.json"
  251. return configmapUpdate(c, configPath, a)
  252. }
  253. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  254. c, err := GetDefaultPricingData("aws.json")
  255. if err != nil {
  256. return nil, err
  257. }
  258. if updateType == SpotInfoUpdateType {
  259. a := AwsSpotFeedInfo{}
  260. err := json.NewDecoder(r).Decode(&a)
  261. if err != nil {
  262. return nil, err
  263. }
  264. if err != nil {
  265. return nil, err
  266. }
  267. c.ServiceKeyName = a.ServiceKeyName
  268. c.ServiceKeySecret = a.ServiceKeySecret
  269. c.SpotDataPrefix = a.Prefix
  270. c.SpotDataBucket = a.BucketName
  271. c.ProjectID = a.AccountID
  272. c.SpotDataRegion = a.Region
  273. c.SpotLabel = a.SpotLabel
  274. c.SpotLabelValue = a.SpotLabelValue
  275. } else if updateType == AthenaInfoUpdateType {
  276. a := AwsAthenaInfo{}
  277. err := json.NewDecoder(r).Decode(&a)
  278. if err != nil {
  279. return nil, err
  280. }
  281. c.AthenaBucketName = a.AthenaBucketName
  282. c.AthenaRegion = a.AthenaRegion
  283. c.AthenaDatabase = a.AthenaDatabase
  284. c.AthenaTable = a.AthenaTable
  285. c.ServiceKeyName = a.ServiceKeyName
  286. c.ServiceKeySecret = a.ServiceKeySecret
  287. c.ProjectID = a.AccountID
  288. } else {
  289. a := make(map[string]interface{})
  290. err = json.NewDecoder(r).Decode(&a)
  291. if err != nil {
  292. return nil, err
  293. }
  294. for k, v := range a {
  295. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  296. vstr, ok := v.(string)
  297. if ok {
  298. err := SetCustomPricingField(c, kUpper, vstr)
  299. if err != nil {
  300. return nil, err
  301. }
  302. } else {
  303. sci := v.(map[string]interface{})
  304. sc := make(map[string]string)
  305. for k, val := range sci {
  306. sc[k] = val.(string)
  307. }
  308. c.SharedCosts = sc //todo: support reflection/multiple map fields
  309. }
  310. }
  311. }
  312. cj, err := json.Marshal(c)
  313. if err != nil {
  314. return nil, err
  315. }
  316. path := os.Getenv("CONFIG_PATH")
  317. if path == "" {
  318. path = "/models/"
  319. }
  320. path += "aws.json"
  321. remoteEnabled := os.Getenv(remoteEnabled)
  322. if remoteEnabled == "true" {
  323. err = UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
  324. if err != nil {
  325. return nil, err
  326. }
  327. }
  328. err = ioutil.WriteFile(path, cj, 0644)
  329. if err != nil {
  330. return nil, err
  331. }
  332. return c, nil
  333. }
  334. type awsKey struct {
  335. SpotLabelName string
  336. SpotLabelValue string
  337. Labels map[string]string
  338. ProviderID string
  339. }
  340. func (k *awsKey) GPUType() string {
  341. return ""
  342. }
  343. func (k *awsKey) ID() string {
  344. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  345. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  346. if matchNum == 2 {
  347. return group
  348. }
  349. }
  350. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  351. return ""
  352. }
  353. func (k *awsKey) Features() string {
  354. instanceType := k.Labels[v1.LabelInstanceType]
  355. var operatingSystem string
  356. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  357. if !ok {
  358. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  359. }
  360. region := k.Labels[v1.LabelZoneRegion]
  361. key := region + "," + instanceType + "," + operatingSystem
  362. usageType := "preemptible"
  363. spotKey := key + "," + usageType
  364. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  365. return spotKey
  366. }
  367. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  368. return spotKey
  369. }
  370. return key
  371. }
  372. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  373. pricing, ok := aws.Pricing[pvk.Features()]
  374. if !ok {
  375. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  376. return &PV{}, nil
  377. }
  378. return pricing.PV, nil
  379. }
  380. type awsPVKey struct {
  381. Labels map[string]string
  382. StorageClassParameters map[string]string
  383. StorageClassName string
  384. Name string
  385. }
  386. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string) PVKey {
  387. return &awsPVKey{
  388. Labels: pv.Labels,
  389. StorageClassName: pv.Spec.StorageClassName,
  390. StorageClassParameters: parameters,
  391. Name: pv.Name,
  392. }
  393. }
  394. func (key *awsPVKey) GetStorageClass() string {
  395. return key.StorageClassName
  396. }
  397. func (key *awsPVKey) Features() string {
  398. storageClass := key.StorageClassParameters["type"]
  399. if storageClass == "standard" {
  400. storageClass = "gp2"
  401. }
  402. // Storage class names are generally EBS volume types (gp2)
  403. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  404. // Converts between the 2
  405. region := key.Labels[v1.LabelZoneRegion]
  406. //if region == "" {
  407. // region = "us-east-1"
  408. //}
  409. class, ok := volTypes[storageClass]
  410. if !ok {
  411. klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  412. }
  413. return region + "," + class
  414. }
  415. // GetKey maps node labels to information needed to retrieve pricing data
  416. func (aws *AWS) GetKey(labels map[string]string) Key {
  417. return &awsKey{
  418. SpotLabelName: aws.SpotLabelName,
  419. SpotLabelValue: aws.SpotLabelValue,
  420. Labels: labels,
  421. ProviderID: labels["providerID"],
  422. }
  423. }
  424. func (aws *AWS) isPreemptible(key string) bool {
  425. s := strings.Split(key, ",")
  426. if len(s) == 4 && s[3] == "preemptible" {
  427. return true
  428. }
  429. return false
  430. }
  431. // DownloadPricingData fetches data from the AWS Pricing API
  432. func (aws *AWS) DownloadPricingData() error {
  433. aws.DownloadPricingDataLock.Lock()
  434. defer aws.DownloadPricingDataLock.Unlock()
  435. c, err := GetDefaultPricingData("aws.json")
  436. if err != nil {
  437. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  438. }
  439. aws.BaseCPUPrice = c.CPU
  440. aws.BaseRAMPrice = c.RAM
  441. aws.BaseGPUPrice = c.GPU
  442. aws.BaseSpotCPUPrice = c.SpotCPU
  443. aws.BaseSpotRAMPrice = c.SpotRAM
  444. aws.SpotLabelName = c.SpotLabel
  445. aws.SpotLabelValue = c.SpotLabelValue
  446. aws.SpotDataBucket = c.SpotDataBucket
  447. aws.SpotDataPrefix = c.SpotDataPrefix
  448. aws.ProjectID = c.ProjectID
  449. aws.SpotDataRegion = c.SpotDataRegion
  450. aws.ServiceKeyName = c.ServiceKeyName
  451. aws.ServiceKeySecret = c.ServiceKeySecret
  452. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  453. klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  454. }
  455. nodeList := aws.Clientset.GetAllNodes()
  456. inputkeys := make(map[string]bool)
  457. for _, n := range nodeList {
  458. labels := n.GetObjectMeta().GetLabels()
  459. key := aws.GetKey(labels)
  460. inputkeys[key.Features()] = true
  461. }
  462. pvList := aws.Clientset.GetAllPersistentVolumes()
  463. storageClasses := aws.Clientset.GetAllStorageClasses()
  464. storageClassMap := make(map[string]map[string]string)
  465. for _, storageClass := range storageClasses {
  466. params := storageClass.Parameters
  467. storageClassMap[storageClass.ObjectMeta.Name] = params
  468. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  469. storageClassMap["default"] = params
  470. storageClassMap[""] = params
  471. }
  472. }
  473. pvkeys := make(map[string]PVKey)
  474. for _, pv := range pvList {
  475. params, ok := storageClassMap[pv.Spec.StorageClassName]
  476. if !ok {
  477. klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  478. continue
  479. }
  480. key := aws.GetPVKey(pv, params)
  481. pvkeys[key.Features()] = key
  482. }
  483. reserved, err := aws.getReservedInstances()
  484. if err != nil {
  485. klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
  486. } else {
  487. klog.V(1).Infof("Found %d reserved instances", len(reserved))
  488. aws.ReservedInstances = reserved
  489. for _, r := range reserved {
  490. klog.V(1).Infof("%s", r)
  491. }
  492. }
  493. aws.Pricing = make(map[string]*AWSProductTerms)
  494. aws.ValidPricingKeys = make(map[string]bool)
  495. skusToKeys := make(map[string]string)
  496. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  497. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  498. resp, err := http.Get(pricingURL)
  499. if err != nil {
  500. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  501. return err
  502. }
  503. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  504. dec := json.NewDecoder(resp.Body)
  505. for {
  506. t, err := dec.Token()
  507. if err == io.EOF {
  508. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  509. break
  510. }
  511. if t == "products" {
  512. _, err := dec.Token() // this should parse the opening "{""
  513. if err != nil {
  514. return err
  515. }
  516. for dec.More() {
  517. _, err := dec.Token() // the sku token
  518. if err != nil {
  519. return err
  520. }
  521. product := &AWSProduct{}
  522. err = dec.Decode(&product)
  523. if err != nil {
  524. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  525. break
  526. }
  527. if product.Attributes.PreInstalledSw == "NA" &&
  528. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  529. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  530. spotKey := key + ",preemptible"
  531. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  532. productTerms := &AWSProductTerms{
  533. Sku: product.Sku,
  534. Memory: product.Attributes.Memory,
  535. Storage: product.Attributes.Storage,
  536. VCpu: product.Attributes.VCpu,
  537. GPU: product.Attributes.GPU,
  538. }
  539. aws.Pricing[key] = productTerms
  540. aws.Pricing[spotKey] = productTerms
  541. skusToKeys[product.Sku] = key
  542. }
  543. aws.ValidPricingKeys[key] = true
  544. aws.ValidPricingKeys[spotKey] = true
  545. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  546. // UsageTypes may be prefixed with a region code - we're removing this when using
  547. // volTypes to keep lookups generic
  548. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  549. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  550. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  551. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  552. spotKey := key + ",preemptible"
  553. pv := &PV{
  554. Class: volTypes[usageTypeNoRegion],
  555. Region: locationToRegion[product.Attributes.Location],
  556. }
  557. productTerms := &AWSProductTerms{
  558. Sku: product.Sku,
  559. PV: pv,
  560. }
  561. aws.Pricing[key] = productTerms
  562. aws.Pricing[spotKey] = productTerms
  563. skusToKeys[product.Sku] = key
  564. aws.ValidPricingKeys[key] = true
  565. aws.ValidPricingKeys[spotKey] = true
  566. }
  567. }
  568. }
  569. if t == "terms" {
  570. _, err := dec.Token() // this should parse the opening "{""
  571. if err != nil {
  572. return err
  573. }
  574. termType, err := dec.Token()
  575. if err != nil {
  576. return err
  577. }
  578. if termType == "OnDemand" {
  579. _, err := dec.Token()
  580. if err != nil { // again, should parse an opening "{"
  581. return err
  582. }
  583. for dec.More() {
  584. sku, err := dec.Token()
  585. if err != nil {
  586. return err
  587. }
  588. _, err = dec.Token() // another opening "{"
  589. if err != nil {
  590. return err
  591. }
  592. skuOnDemand, err := dec.Token()
  593. if err != nil {
  594. return err
  595. }
  596. offerTerm := &AWSOfferTerm{}
  597. err = dec.Decode(&offerTerm)
  598. if err != nil {
  599. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  600. }
  601. if sku.(string)+OnDemandRateCode == skuOnDemand {
  602. key, ok := skusToKeys[sku.(string)]
  603. spotKey := key + ",preemptible"
  604. if ok {
  605. aws.Pricing[key].OnDemand = offerTerm
  606. aws.Pricing[spotKey].OnDemand = offerTerm
  607. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  608. // If the specific UsageType is the per IO cost used on io1 volumes
  609. // we need to add the per IO cost to the io1 PV cost
  610. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  611. // Add the per IO cost to the PV object for the io1 volume type
  612. aws.Pricing[key].PV.CostPerIO = cost
  613. } else if strings.Contains(key, "EBS:Volume") {
  614. // If volume, we need to get hourly cost and add it to the PV object
  615. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  616. costFloat, _ := strconv.ParseFloat(cost, 64)
  617. hourlyPrice := costFloat / 730
  618. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  619. }
  620. }
  621. }
  622. _, err = dec.Token()
  623. if err != nil {
  624. return err
  625. }
  626. }
  627. _, err = dec.Token()
  628. if err != nil {
  629. return err
  630. }
  631. }
  632. }
  633. }
  634. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  635. if err != nil {
  636. klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
  637. } else {
  638. aws.SpotPricingByInstanceID = sp
  639. }
  640. return nil
  641. }
  642. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  643. func (c *AWS) NetworkPricing() (*Network, error) {
  644. cpricing, err := GetDefaultPricingData("aws.json")
  645. if err != nil {
  646. return nil, err
  647. }
  648. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  649. if err != nil {
  650. return nil, err
  651. }
  652. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  653. if err != nil {
  654. return nil, err
  655. }
  656. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  657. if err != nil {
  658. return nil, err
  659. }
  660. return &Network{
  661. ZoneNetworkEgressCost: znec,
  662. RegionNetworkEgressCost: rnec,
  663. InternetNetworkEgressCost: inec,
  664. }, nil
  665. }
  666. // AllNodePricing returns all the billing data fetched.
  667. func (aws *AWS) AllNodePricing() (interface{}, error) {
  668. aws.DownloadPricingDataLock.RLock()
  669. defer aws.DownloadPricingDataLock.RUnlock()
  670. return aws.Pricing, nil
  671. }
  672. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  673. key := k.Features()
  674. if aws.isPreemptible(key) {
  675. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  676. var spotcost string
  677. arr := strings.Split(spotInfo.Charge, " ")
  678. if len(arr) == 2 {
  679. spotcost = arr[0]
  680. } else {
  681. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  682. }
  683. return &Node{
  684. Cost: spotcost,
  685. VCPU: terms.VCpu,
  686. RAM: terms.Memory,
  687. GPU: terms.GPU,
  688. Storage: terms.Storage,
  689. BaseCPUPrice: aws.BaseCPUPrice,
  690. BaseRAMPrice: aws.BaseRAMPrice,
  691. BaseGPUPrice: aws.BaseGPUPrice,
  692. UsageType: usageType,
  693. }, nil
  694. }
  695. return &Node{
  696. VCPU: terms.VCpu,
  697. VCPUCost: aws.BaseSpotCPUPrice,
  698. RAM: terms.Memory,
  699. GPU: terms.GPU,
  700. RAMCost: aws.BaseSpotRAMPrice,
  701. Storage: terms.Storage,
  702. BaseCPUPrice: aws.BaseCPUPrice,
  703. BaseRAMPrice: aws.BaseRAMPrice,
  704. BaseGPUPrice: aws.BaseGPUPrice,
  705. UsageType: usageType,
  706. }, nil
  707. }
  708. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  709. if !ok {
  710. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  711. }
  712. cost := c.PricePerUnit.USD
  713. return &Node{
  714. Cost: cost,
  715. VCPU: terms.VCpu,
  716. RAM: terms.Memory,
  717. GPU: terms.GPU,
  718. Storage: terms.Storage,
  719. BaseCPUPrice: aws.BaseCPUPrice,
  720. BaseRAMPrice: aws.BaseRAMPrice,
  721. BaseGPUPrice: aws.BaseGPUPrice,
  722. UsageType: usageType,
  723. }, nil
  724. }
  725. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  726. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  727. aws.DownloadPricingDataLock.RLock()
  728. defer aws.DownloadPricingDataLock.RUnlock()
  729. key := k.Features()
  730. usageType := "ondemand"
  731. if aws.isPreemptible(key) {
  732. usageType = "preemptible"
  733. }
  734. terms, ok := aws.Pricing[key]
  735. if ok {
  736. return aws.createNode(terms, usageType, k)
  737. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  738. aws.DownloadPricingDataLock.RUnlock()
  739. err := aws.DownloadPricingData()
  740. aws.DownloadPricingDataLock.RLock()
  741. if err != nil {
  742. return &Node{
  743. Cost: aws.BaseCPUPrice,
  744. BaseCPUPrice: aws.BaseCPUPrice,
  745. BaseRAMPrice: aws.BaseRAMPrice,
  746. BaseGPUPrice: aws.BaseGPUPrice,
  747. UsageType: usageType,
  748. UsesBaseCPUPrice: true,
  749. }, err
  750. }
  751. terms, termsOk := aws.Pricing[key]
  752. if !termsOk {
  753. return &Node{
  754. Cost: aws.BaseCPUPrice,
  755. BaseCPUPrice: aws.BaseCPUPrice,
  756. BaseRAMPrice: aws.BaseRAMPrice,
  757. BaseGPUPrice: aws.BaseGPUPrice,
  758. UsageType: usageType,
  759. UsesBaseCPUPrice: true,
  760. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  761. }
  762. return aws.createNode(terms, usageType, k)
  763. } else { // Fall back to base pricing if we can't find the key.
  764. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  765. return &Node{
  766. Cost: aws.BaseCPUPrice,
  767. BaseCPUPrice: aws.BaseCPUPrice,
  768. BaseRAMPrice: aws.BaseRAMPrice,
  769. BaseGPUPrice: aws.BaseGPUPrice,
  770. UsageType: usageType,
  771. UsesBaseCPUPrice: true,
  772. }, nil
  773. }
  774. }
  775. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  776. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  777. defaultClusterName := "AWS Cluster #1"
  778. c, err := awsProvider.GetConfig()
  779. if err != nil {
  780. return nil, err
  781. }
  782. remote := os.Getenv(remoteEnabled)
  783. remoteEnabled := false
  784. if os.Getenv(remote) == "true" {
  785. remoteEnabled = true
  786. }
  787. if c.ClusterName != "" {
  788. m := make(map[string]string)
  789. m["name"] = c.ClusterName
  790. m["provider"] = "AWS"
  791. m["id"] = os.Getenv(clusterIDKey)
  792. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  793. return m, nil
  794. }
  795. makeStructure := func(clusterName string) (map[string]string, error) {
  796. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  797. m := make(map[string]string)
  798. m["name"] = clusterName
  799. m["provider"] = "AWS"
  800. m["id"] = os.Getenv(clusterIDKey)
  801. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  802. return m, nil
  803. }
  804. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  805. if len(maybeClusterId) != 0 {
  806. return makeStructure(maybeClusterId)
  807. }
  808. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  809. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  810. nodeList := awsProvider.Clientset.GetAllNodes()
  811. for _, n := range nodeList {
  812. region := ""
  813. instanceId := ""
  814. providerId := n.Spec.ProviderID
  815. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  816. if matchNum == 1 {
  817. region = group
  818. } else if matchNum == 2 {
  819. instanceId = group
  820. }
  821. }
  822. if len(instanceId) == 0 {
  823. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  824. continue
  825. }
  826. c := &aws.Config{
  827. Region: aws.String(region),
  828. }
  829. s := session.Must(session.NewSession(c))
  830. ec2Svc := ec2.New(s)
  831. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  832. InstanceIds: []*string{
  833. aws.String(instanceId),
  834. },
  835. })
  836. if diErr != nil {
  837. // maybe log this?
  838. continue
  839. }
  840. if len(di.Reservations) != 1 {
  841. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  842. continue
  843. }
  844. res := di.Reservations[0]
  845. if len(res.Instances) != 1 {
  846. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  847. continue
  848. }
  849. inst := res.Instances[0]
  850. for _, tag := range inst.Tags {
  851. tagKey := *tag.Key
  852. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  853. if matchNum != 1 {
  854. continue
  855. }
  856. return makeStructure(group)
  857. }
  858. }
  859. }
  860. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  861. return makeStructure(defaultClusterName)
  862. }
  863. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  864. func (*AWS) AddServiceKey(formValues url.Values) error {
  865. keyID := formValues.Get("access_key_ID")
  866. key := formValues.Get("secret_access_key")
  867. m := make(map[string]string)
  868. m["access_key_ID"] = keyID
  869. m["secret_access_key"] = key
  870. result, err := json.Marshal(m)
  871. if err != nil {
  872. return err
  873. }
  874. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  875. }
  876. func configureAWSAuth(keyFile string) error {
  877. jsonFile, err := os.Open(keyFile)
  878. if err != nil {
  879. if os.IsNotExist(err) {
  880. klog.V(2).Infof("Using Default Credentials")
  881. return nil
  882. }
  883. return err
  884. }
  885. defer jsonFile.Close()
  886. byteValue, _ := ioutil.ReadAll(jsonFile)
  887. var result map[string]string
  888. err = json.Unmarshal([]byte(byteValue), &result)
  889. if err != nil {
  890. return err
  891. }
  892. err = os.Setenv(awsAccessKeyIDEnvVar, result["awsServiceKeyName"])
  893. if err != nil {
  894. return err
  895. }
  896. err = os.Setenv(awsAccessKeySecretEnvVar, result["awsServiceKeySecret"])
  897. if err != nil {
  898. return err
  899. }
  900. return nil
  901. }
  902. func getClusterConfig(ccFile string) (map[string]string, error) {
  903. clusterConfig, err := os.Open(ccFile)
  904. if err != nil {
  905. return nil, err
  906. }
  907. defer clusterConfig.Close()
  908. b, err := ioutil.ReadAll(clusterConfig)
  909. if err != nil {
  910. return nil, err
  911. }
  912. var clusterConf map[string]string
  913. err = json.Unmarshal([]byte(b), &clusterConf)
  914. if err != nil {
  915. return nil, err
  916. }
  917. return clusterConf, nil
  918. }
  919. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  920. func (*AWS) GetDisks() ([]byte, error) {
  921. err := configureAWSAuth("/var/configs/key.json")
  922. if err != nil {
  923. return nil, err
  924. }
  925. clusterConfig, err := getClusterConfig("/var/configs/cluster.json")
  926. if err != nil {
  927. return nil, err
  928. }
  929. region := aws.String(clusterConfig["region"])
  930. c := &aws.Config{
  931. Region: region,
  932. }
  933. s := session.Must(session.NewSession(c))
  934. ec2Svc := ec2.New(s)
  935. input := &ec2.DescribeVolumesInput{}
  936. volumeResult, err := ec2Svc.DescribeVolumes(input)
  937. if err != nil {
  938. if aerr, ok := err.(awserr.Error); ok {
  939. switch aerr.Code() {
  940. default:
  941. return nil, aerr
  942. }
  943. } else {
  944. return nil, err
  945. }
  946. }
  947. return json.Marshal(volumeResult)
  948. }
  949. // ConvertToGlueColumnFormat takes a string and runs through various regex
  950. // and string replacement statements to convert it to a format compatible
  951. // with AWS Glue and Athena column names.
  952. // Following guidance from AWS provided here ('Column Names' section):
  953. // https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/run-athena-sql.html
  954. // It returns a string containing the column name in proper column name format and length.
  955. func ConvertToGlueColumnFormat(column_name string) string {
  956. klog.V(5).Infof("Converting string \"%s\" to proper AWS Glue column name.", column_name)
  957. // An underscore is added in front of uppercase letters
  958. capital_underscore := regexp.MustCompile(`[A-Z]`)
  959. final := capital_underscore.ReplaceAllString(column_name, `_$0`)
  960. // Any non-alphanumeric characters are replaced with an underscore
  961. no_space_punc := regexp.MustCompile(`[\s]{1,}|[^A-Za-z0-9]`)
  962. final = no_space_punc.ReplaceAllString(final, "_")
  963. // Duplicate underscores are removed
  964. no_dup_underscore := regexp.MustCompile(`_{2,}`)
  965. final = no_dup_underscore.ReplaceAllString(final, "_")
  966. // Any leading and trailing underscores are removed
  967. no_front_end_underscore := regexp.MustCompile(`(^\_|\_$)`)
  968. final = no_front_end_underscore.ReplaceAllString(final, "")
  969. // Uppercase to lowercase
  970. final = strings.ToLower(final)
  971. // Longer column name than expected - remove _ left to right
  972. allowed_col_len := 128
  973. undersc_to_remove := len(final) - allowed_col_len
  974. if undersc_to_remove > 0 {
  975. final = strings.Replace(final, "_", "", undersc_to_remove)
  976. }
  977. // If removing all of the underscores still didn't
  978. // make the column name < 128 characters, trim it!
  979. if len(final) > allowed_col_len {
  980. final = final[:allowed_col_len]
  981. }
  982. klog.V(5).Infof("Column name being returned: \"%s\". Length: \"%d\".", final, len(final))
  983. return final
  984. }
  985. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  986. // "start" and "end" are dates of the format YYYY-MM-DD
  987. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  988. func (a *AWS) ExternalAllocations(start string, end string, aggregator string, filterType string, filterValue string) ([]*OutOfClusterAllocation, error) {
  989. customPricing, err := a.GetConfig()
  990. if err != nil {
  991. return nil, err
  992. }
  993. aggregator_column_name := "resource_tags_user_" + aggregator
  994. aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
  995. filter_column_name := "resource_tags_user_" + filterType
  996. filter_column_name = ConvertToGlueColumnFormat(filter_column_name)
  997. var query string
  998. var lastIdx int
  999. if filterType != "kubernetes_" { // This gets appended upstream and is equivalent to no filter.
  1000. query = fmt.Sprintf(`SELECT
  1001. CAST(line_item_usage_start_date AS DATE) as start_date,
  1002. %s,
  1003. line_item_product_code,
  1004. %s,
  1005. SUM(line_item_blended_cost) as blended_cost
  1006. FROM %s as cost_data
  1007. WHERE (%s='%s' OR %s='') AND line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1008. GROUP BY 1,2,3,4`, aggregator_column_name, filter_column_name, customPricing.AthenaTable, filter_column_name, filterValue, filter_column_name, start, end)
  1009. lastIdx = 4
  1010. } else {
  1011. lastIdx = 3
  1012. query = fmt.Sprintf(`SELECT
  1013. CAST(line_item_usage_start_date AS DATE) as start_date,
  1014. %s,
  1015. line_item_product_code,
  1016. SUM(line_item_blended_cost) as blended_cost
  1017. FROM %s as cost_data
  1018. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1019. GROUP BY 1,2,3`, aggregator_column_name, customPricing.AthenaTable, start, end)
  1020. }
  1021. klog.V(3).Infof("Running Query: %s", query)
  1022. if customPricing.ServiceKeyName != "" {
  1023. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1024. if err != nil {
  1025. return nil, err
  1026. }
  1027. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1028. if err != nil {
  1029. return nil, err
  1030. }
  1031. }
  1032. region := aws.String(customPricing.AthenaRegion)
  1033. resultsBucket := customPricing.AthenaBucketName
  1034. database := customPricing.AthenaDatabase
  1035. c := &aws.Config{
  1036. Region: region,
  1037. }
  1038. s := session.Must(session.NewSession(c))
  1039. svc := athena.New(s)
  1040. var e athena.StartQueryExecutionInput
  1041. var r athena.ResultConfiguration
  1042. r.SetOutputLocation(resultsBucket)
  1043. e.SetResultConfiguration(&r)
  1044. e.SetQueryString(query)
  1045. var q athena.QueryExecutionContext
  1046. q.SetDatabase(database)
  1047. e.SetQueryExecutionContext(&q)
  1048. res, err := svc.StartQueryExecution(&e)
  1049. if err != nil {
  1050. return nil, err
  1051. }
  1052. klog.V(2).Infof("StartQueryExecution result:")
  1053. klog.V(2).Infof(res.GoString())
  1054. var qri athena.GetQueryExecutionInput
  1055. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1056. var qrop *athena.GetQueryExecutionOutput
  1057. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1058. for {
  1059. qrop, err = svc.GetQueryExecution(&qri)
  1060. if err != nil {
  1061. return nil, err
  1062. }
  1063. if *qrop.QueryExecution.Status.State != "RUNNING" {
  1064. break
  1065. }
  1066. time.Sleep(duration)
  1067. }
  1068. var oocAllocs []*OutOfClusterAllocation
  1069. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1070. var ip athena.GetQueryResultsInput
  1071. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1072. op, err := svc.GetQueryResults(&ip)
  1073. if err != nil {
  1074. return nil, err
  1075. }
  1076. if len(op.ResultSet.Rows) > 1 {
  1077. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1078. cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
  1079. if err != nil {
  1080. return nil, err
  1081. }
  1082. ooc := &OutOfClusterAllocation{
  1083. Aggregator: aggregator,
  1084. Environment: *r.Data[1].VarCharValue,
  1085. Service: *r.Data[2].VarCharValue,
  1086. Cost: cost,
  1087. }
  1088. oocAllocs = append(oocAllocs, ooc)
  1089. }
  1090. } else {
  1091. klog.V(1).Infof("No results available for %s at database %s between %s and %s", aggregator_column_name, customPricing.AthenaTable, start, end)
  1092. }
  1093. }
  1094. return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
  1095. }
  1096. // QuerySQL can query a properly configured Athena database.
  1097. // Used to fetch billing data.
  1098. // Requires a json config in /var/configs with key region, output, and database.
  1099. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  1100. customPricing, err := a.GetConfig()
  1101. if err != nil {
  1102. return nil, err
  1103. }
  1104. if customPricing.ServiceKeyName != "" {
  1105. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1106. if err != nil {
  1107. return nil, err
  1108. }
  1109. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1110. if err != nil {
  1111. return nil, err
  1112. }
  1113. }
  1114. athenaConfigs, err := os.Open("/var/configs/athena.json")
  1115. if err != nil {
  1116. return nil, err
  1117. }
  1118. defer athenaConfigs.Close()
  1119. b, err := ioutil.ReadAll(athenaConfigs)
  1120. if err != nil {
  1121. return nil, err
  1122. }
  1123. var athenaConf map[string]string
  1124. json.Unmarshal([]byte(b), &athenaConf)
  1125. region := aws.String(customPricing.AthenaRegion)
  1126. resultsBucket := customPricing.AthenaBucketName
  1127. database := customPricing.AthenaDatabase
  1128. c := &aws.Config{
  1129. Region: region,
  1130. }
  1131. s := session.Must(session.NewSession(c))
  1132. svc := athena.New(s)
  1133. var e athena.StartQueryExecutionInput
  1134. var r athena.ResultConfiguration
  1135. r.SetOutputLocation(resultsBucket)
  1136. e.SetResultConfiguration(&r)
  1137. e.SetQueryString(query)
  1138. var q athena.QueryExecutionContext
  1139. q.SetDatabase(database)
  1140. e.SetQueryExecutionContext(&q)
  1141. res, err := svc.StartQueryExecution(&e)
  1142. if err != nil {
  1143. return nil, err
  1144. }
  1145. klog.V(2).Infof("StartQueryExecution result:")
  1146. klog.V(2).Infof(res.GoString())
  1147. var qri athena.GetQueryExecutionInput
  1148. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1149. var qrop *athena.GetQueryExecutionOutput
  1150. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1151. for {
  1152. qrop, err = svc.GetQueryExecution(&qri)
  1153. if err != nil {
  1154. return nil, err
  1155. }
  1156. if *qrop.QueryExecution.Status.State != "RUNNING" {
  1157. break
  1158. }
  1159. time.Sleep(duration)
  1160. }
  1161. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1162. var ip athena.GetQueryResultsInput
  1163. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1164. op, err := svc.GetQueryResults(&ip)
  1165. if err != nil {
  1166. return nil, err
  1167. }
  1168. b, err := json.Marshal(op.ResultSet)
  1169. if err != nil {
  1170. return nil, err
  1171. }
  1172. return b, nil
  1173. }
  1174. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  1175. }
  1176. type spotInfo struct {
  1177. Timestamp string `csv:"Timestamp"`
  1178. UsageType string `csv:"UsageType"`
  1179. Operation string `csv:"Operation"`
  1180. InstanceID string `csv:"InstanceID"`
  1181. MyBidID string `csv:"MyBidID"`
  1182. MyMaxPrice string `csv:"MyMaxPrice"`
  1183. MarketPrice string `csv:"MarketPrice"`
  1184. Charge string `csv:"Charge"`
  1185. Version string `csv:"Version"`
  1186. }
  1187. type fnames []*string
  1188. func (f fnames) Len() int {
  1189. return len(f)
  1190. }
  1191. func (f fnames) Swap(i, j int) {
  1192. f[i], f[j] = f[j], f[i]
  1193. }
  1194. func (f fnames) Less(i, j int) bool {
  1195. key1 := strings.Split(*f[i], ".")
  1196. key2 := strings.Split(*f[j], ".")
  1197. t1, err := time.Parse("2006-01-02-15", key1[1])
  1198. if err != nil {
  1199. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  1200. return false
  1201. }
  1202. t2, err := time.Parse("2006-01-02-15", key2[1])
  1203. if err != nil {
  1204. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  1205. return false
  1206. }
  1207. return t1.Before(t2)
  1208. }
  1209. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  1210. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1211. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  1212. if err != nil {
  1213. return nil, err
  1214. }
  1215. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  1216. if err != nil {
  1217. return nil, err
  1218. }
  1219. }
  1220. s3Prefix := projectID
  1221. if len(prefix) != 0 {
  1222. s3Prefix = prefix + "/" + s3Prefix
  1223. }
  1224. c := aws.NewConfig().WithRegion(region)
  1225. s := session.Must(session.NewSession(c))
  1226. s3Svc := s3.New(s)
  1227. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  1228. tNow := time.Now()
  1229. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1230. ls := &s3.ListObjectsInput{
  1231. Bucket: aws.String(bucket),
  1232. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1233. }
  1234. ls2 := &s3.ListObjectsInput{
  1235. Bucket: aws.String(bucket),
  1236. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1237. }
  1238. lso, err := s3Svc.ListObjects(ls)
  1239. if err != nil {
  1240. return nil, err
  1241. }
  1242. lsoLen := len(lso.Contents)
  1243. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1244. if lsoLen == 0 {
  1245. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1246. }
  1247. lso2, err := s3Svc.ListObjects(ls2)
  1248. if err != nil {
  1249. return nil, err
  1250. }
  1251. lso2Len := len(lso2.Contents)
  1252. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1253. if lso2Len == 0 {
  1254. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1255. }
  1256. var keys []*string
  1257. for _, obj := range lso.Contents {
  1258. keys = append(keys, obj.Key)
  1259. }
  1260. for _, obj := range lso2.Contents {
  1261. keys = append(keys, obj.Key)
  1262. }
  1263. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1264. header, err := csvutil.Header(spotInfo{}, "csv")
  1265. if err != nil {
  1266. return nil, err
  1267. }
  1268. fieldsPerRecord := len(header)
  1269. spots := make(map[string]*spotInfo)
  1270. for _, key := range keys {
  1271. getObj := &s3.GetObjectInput{
  1272. Bucket: aws.String(bucket),
  1273. Key: key,
  1274. }
  1275. buf := aws.NewWriteAtBuffer([]byte{})
  1276. _, err := downloader.Download(buf, getObj)
  1277. if err != nil {
  1278. return nil, err
  1279. }
  1280. r := bytes.NewReader(buf.Bytes())
  1281. gr, err := gzip.NewReader(r)
  1282. if err != nil {
  1283. return nil, err
  1284. }
  1285. csvReader := csv.NewReader(gr)
  1286. csvReader.Comma = '\t'
  1287. csvReader.FieldsPerRecord = fieldsPerRecord
  1288. dec, err := csvutil.NewDecoder(csvReader, header...)
  1289. if err != nil {
  1290. return nil, err
  1291. }
  1292. var foundVersion string
  1293. for {
  1294. spot := spotInfo{}
  1295. err := dec.Decode(&spot)
  1296. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1297. if err == io.EOF {
  1298. break
  1299. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1300. rec := dec.Record()
  1301. // the first two "Record()" will be the comment lines
  1302. // and they show up as len() == 1
  1303. // the first of which is "#Version"
  1304. // the second of which is "#Fields: "
  1305. if len(rec) != 1 {
  1306. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1307. continue
  1308. }
  1309. if len(foundVersion) == 0 {
  1310. spotFeedVersion := rec[0]
  1311. klog.V(4).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  1312. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1313. if matches != nil {
  1314. foundVersion = matches[1]
  1315. if foundVersion != supportedSpotFeedVersion {
  1316. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1317. break
  1318. }
  1319. }
  1320. continue
  1321. } else if strings.Index(rec[0], "#") == 0 {
  1322. continue
  1323. } else {
  1324. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  1325. continue
  1326. }
  1327. } else if err != nil {
  1328. klog.V(2).Infof("Error during spot info decode: %+v", err)
  1329. continue
  1330. }
  1331. klog.V(4).Infof("Found spot info %+v", spot)
  1332. spots[spot.InstanceID] = &spot
  1333. }
  1334. gr.Close()
  1335. }
  1336. return spots, nil
  1337. }
  1338. func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  1339. numReserved := len(a.ReservedInstances)
  1340. // Early return if no reserved instance data loaded
  1341. if numReserved == 0 {
  1342. klog.V(4).Infof("[Reserved] No Reserved Instances")
  1343. return
  1344. }
  1345. cfg, err := a.GetConfig()
  1346. defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
  1347. if err != nil {
  1348. klog.V(3).Infof("Could not parse default cpu price")
  1349. defaultCPU = 0.031611
  1350. }
  1351. defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
  1352. if err != nil {
  1353. klog.V(3).Infof("Could not parse default ram price")
  1354. defaultRAM = 0.004237
  1355. }
  1356. cpuToRAMRatio := defaultCPU / defaultRAM
  1357. now := time.Now()
  1358. instances := make(map[string][]*AWSReservedInstance)
  1359. for _, r := range a.ReservedInstances {
  1360. if now.Before(r.StartDate) || now.After(r.EndDate) {
  1361. klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
  1362. continue
  1363. }
  1364. _, ok := instances[r.Region]
  1365. if !ok {
  1366. instances[r.Region] = []*AWSReservedInstance{r}
  1367. } else {
  1368. instances[r.Region] = append(instances[r.Region], r)
  1369. }
  1370. }
  1371. awsNodes := make(map[string]*v1.Node)
  1372. currentNodes := a.Clientset.GetAllNodes()
  1373. // Create a node name -> node map
  1374. for _, awsNode := range currentNodes {
  1375. awsNodes[awsNode.GetName()] = awsNode
  1376. }
  1377. // go through all provider nodes using k8s nodes for region
  1378. for nodeName, node := range nodes {
  1379. // Reset reserved allocation to prevent double allocation
  1380. node.Reserved = nil
  1381. kNode, ok := awsNodes[nodeName]
  1382. if !ok {
  1383. klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
  1384. continue
  1385. }
  1386. nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
  1387. if !ok {
  1388. klog.V(1).Infof("[Reserved] Could not find node region")
  1389. continue
  1390. }
  1391. reservedInstances, ok := instances[nodeRegion]
  1392. if !ok {
  1393. klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
  1394. continue
  1395. }
  1396. // Determine the InstanceType of the node
  1397. instanceType, ok := kNode.Labels["beta.kubernetes.io/instance-type"]
  1398. if !ok {
  1399. continue
  1400. }
  1401. ramBytes, err := strconv.ParseFloat(node.RAMBytes, 64)
  1402. if err != nil {
  1403. continue
  1404. }
  1405. ramGB := ramBytes / 1024 / 1024 / 1024
  1406. cpu, err := strconv.ParseFloat(node.VCPU, 64)
  1407. if err != nil {
  1408. continue
  1409. }
  1410. ramMultiple := cpu*cpuToRAMRatio + ramGB
  1411. node.Reserved = &ReservedInstanceData{
  1412. ReservedCPU: 0,
  1413. ReservedRAM: 0,
  1414. }
  1415. for i, reservedInstance := range reservedInstances {
  1416. if reservedInstance.InstanceType == instanceType {
  1417. // Use < 0 to mark as ALL
  1418. node.Reserved.ReservedCPU = -1
  1419. node.Reserved.ReservedRAM = -1
  1420. // Set Costs based on CPU/RAM ratios
  1421. ramPrice := reservedInstance.PricePerHour / ramMultiple
  1422. node.Reserved.CPUCost = ramPrice * cpuToRAMRatio
  1423. node.Reserved.RAMCost = ramPrice
  1424. // Remove the reserve from the temporary slice to prevent
  1425. // being reallocated
  1426. instances[nodeRegion] = append(reservedInstances[:i], reservedInstances[i+1:]...)
  1427. break
  1428. }
  1429. }
  1430. }
  1431. }
  1432. type AWSReservedInstance struct {
  1433. Zone string
  1434. Region string
  1435. InstanceType string
  1436. InstanceCount int64
  1437. InstanceTenacy string
  1438. StartDate time.Time
  1439. EndDate time.Time
  1440. PricePerHour float64
  1441. }
  1442. func (ari *AWSReservedInstance) String() string {
  1443. return fmt.Sprintf("[Zone: %s, Region: %s, Type: %s, Count: %d, Tenacy: %s, Start: %+v, End: %+v, Price: %f]", ari.Zone, ari.Region, ari.InstanceType, ari.InstanceCount, ari.InstanceTenacy, ari.StartDate, ari.EndDate, ari.PricePerHour)
  1444. }
  1445. func isReservedInstanceHourlyPrice(rc *ec2.RecurringCharge) bool {
  1446. return rc != nil && rc.Frequency != nil && *rc.Frequency == "Hourly"
  1447. }
  1448. func getReservedInstancePrice(ri *ec2.ReservedInstances) (float64, error) {
  1449. var pricePerHour float64
  1450. if len(ri.RecurringCharges) > 0 {
  1451. for _, rc := range ri.RecurringCharges {
  1452. if isReservedInstanceHourlyPrice(rc) {
  1453. pricePerHour = *rc.Amount
  1454. break
  1455. }
  1456. }
  1457. }
  1458. // If we're still unable to resolve hourly price, try fixed -> hourly
  1459. if pricePerHour == 0 {
  1460. if ri.Duration != nil && ri.FixedPrice != nil {
  1461. var durHours float64
  1462. durSeconds := float64(*ri.Duration)
  1463. fixedPrice := float64(*ri.FixedPrice)
  1464. if durSeconds != 0 && fixedPrice != 0 {
  1465. durHours = durSeconds / 60 / 60
  1466. pricePerHour = fixedPrice / durHours
  1467. }
  1468. }
  1469. }
  1470. if pricePerHour == 0 {
  1471. return 0, fmt.Errorf("Failed to resolve an hourly price from FixedPrice or Recurring Costs")
  1472. }
  1473. return pricePerHour, nil
  1474. }
  1475. func getRegionReservedInstances(region string) ([]*AWSReservedInstance, error) {
  1476. c := &aws.Config{
  1477. Region: aws.String(region),
  1478. }
  1479. s := session.Must(session.NewSession(c))
  1480. svc := ec2.New(s)
  1481. response, err := svc.DescribeReservedInstances(&ec2.DescribeReservedInstancesInput{})
  1482. if err != nil {
  1483. return nil, err
  1484. }
  1485. var reservedInstances []*AWSReservedInstance
  1486. for _, ri := range response.ReservedInstances {
  1487. var zone string
  1488. if ri.AvailabilityZone != nil {
  1489. zone = *ri.AvailabilityZone
  1490. }
  1491. pricePerHour, err := getReservedInstancePrice(ri)
  1492. if err != nil {
  1493. klog.V(1).Infof("Error Resolving Price: %s", err.Error())
  1494. continue
  1495. }
  1496. reservedInstances = append(reservedInstances, &AWSReservedInstance{
  1497. Zone: zone,
  1498. Region: region,
  1499. InstanceType: *ri.InstanceType,
  1500. InstanceCount: *ri.InstanceCount,
  1501. InstanceTenacy: *ri.InstanceTenancy,
  1502. StartDate: *ri.Start,
  1503. EndDate: *ri.End,
  1504. PricePerHour: pricePerHour,
  1505. })
  1506. }
  1507. return reservedInstances, nil
  1508. }
  1509. func (a *AWS) getReservedInstances() ([]*AWSReservedInstance, error) {
  1510. err := configureAWSAuth("/var/configs/aws.json")
  1511. if err != nil {
  1512. return nil, err
  1513. }
  1514. var reservedInstances []*AWSReservedInstance
  1515. nodes := a.Clientset.GetAllNodes()
  1516. regionsSeen := make(map[string]bool)
  1517. for _, node := range nodes {
  1518. region, ok := node.Labels[v1.LabelZoneRegion]
  1519. if !ok {
  1520. continue
  1521. }
  1522. if regionsSeen[region] {
  1523. continue
  1524. }
  1525. ris, err := getRegionReservedInstances(region)
  1526. if err != nil {
  1527. klog.V(3).Infof("Error getting reserved instances: %s", err.Error())
  1528. continue
  1529. }
  1530. regionsSeen[region] = true
  1531. reservedInstances = append(reservedInstances, ris...)
  1532. }
  1533. return reservedInstances, nil
  1534. }