csvprovider.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. package cloud
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "os"
  7. "regexp"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/kubecost/cost-model/pkg/env"
  13. "github.com/kubecost/cost-model/pkg/util"
  14. "github.com/aws/aws-sdk-go/aws"
  15. "github.com/aws/aws-sdk-go/aws/session"
  16. "github.com/aws/aws-sdk-go/service/s3"
  17. "github.com/kubecost/cost-model/pkg/log"
  18. v1 "k8s.io/api/core/v1"
  19. "github.com/jszwec/csvutil"
  20. )
  21. const refreshMinutes = 60
  22. type CSVProvider struct {
  23. *CustomProvider
  24. CSVLocation string
  25. Pricing map[string]*price
  26. NodeClassPricing map[string]float64
  27. NodeClassCount map[string]float64
  28. NodeMapField string
  29. PricingPV map[string]*price
  30. PVMapField string
  31. UsesRegion bool
  32. DownloadPricingDataLock sync.RWMutex
  33. }
  34. type price struct {
  35. EndTimestamp string `csv:"EndTimestamp"`
  36. InstanceID string `csv:"InstanceID"`
  37. Region string `csv:"Region"`
  38. AssetClass string `csv:"AssetClass"`
  39. InstanceIDField string `csv:"InstanceIDField"`
  40. InstanceType string `csv:"InstanceType"`
  41. MarketPriceHourly string `csv:"MarketPriceHourly"`
  42. Version string `csv:"Version"`
  43. }
  44. func GetCsv(location string) (io.Reader, error) {
  45. return os.Open(location)
  46. }
  47. func (c *CSVProvider) DownloadPricingData() error {
  48. c.DownloadPricingDataLock.Lock()
  49. defer c.DownloadPricingDataLock.Unlock()
  50. pricing := make(map[string]*price)
  51. nodeclasspricing := make(map[string]float64)
  52. nodeclasscount := make(map[string]float64)
  53. pvpricing := make(map[string]*price)
  54. header, err := csvutil.Header(price{}, "csv")
  55. if err != nil {
  56. return err
  57. }
  58. fieldsPerRecord := len(header)
  59. var csvr io.Reader
  60. var csverr error
  61. if strings.HasPrefix(c.CSVLocation, "s3://") {
  62. region := env.GetCSVRegion()
  63. conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
  64. s3Client := s3.New(session.New(conf))
  65. bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
  66. if len(bucketAndKey) == 2 {
  67. out, err := s3Client.GetObject(&s3.GetObjectInput{
  68. Bucket: aws.String(bucketAndKey[0]),
  69. Key: aws.String(bucketAndKey[1]),
  70. })
  71. csverr = err
  72. csvr = out.Body
  73. } else {
  74. c.Pricing = pricing
  75. c.NodeClassPricing = nodeclasspricing
  76. c.NodeClassCount = nodeclasscount
  77. c.PricingPV = pvpricing
  78. return fmt.Errorf("Invalid s3 URI: %s", c.CSVLocation)
  79. }
  80. } else {
  81. csvr, csverr = GetCsv(c.CSVLocation)
  82. }
  83. if csverr != nil {
  84. log.Infof("Error reading csv at %s: %s", c.CSVLocation, csverr)
  85. c.Pricing = pricing
  86. c.NodeClassPricing = nodeclasspricing
  87. c.NodeClassCount = nodeclasscount
  88. c.PricingPV = pvpricing
  89. return nil
  90. }
  91. csvReader := csv.NewReader(csvr)
  92. csvReader.Comma = ','
  93. csvReader.FieldsPerRecord = fieldsPerRecord
  94. dec, err := csvutil.NewDecoder(csvReader, header...)
  95. if err != nil {
  96. c.Pricing = pricing
  97. c.NodeClassPricing = nodeclasspricing
  98. c.NodeClassCount = nodeclasscount
  99. c.PricingPV = pvpricing
  100. return err
  101. }
  102. for {
  103. p := price{}
  104. err := dec.Decode(&p)
  105. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  106. if err == io.EOF {
  107. break
  108. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  109. rec := dec.Record()
  110. if len(rec) != 1 {
  111. log.Infof("Expected %d price info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  112. continue
  113. }
  114. if strings.Index(rec[0], "#") == 0 {
  115. continue
  116. } else {
  117. log.Infof("skipping non-CSV line: %s", rec)
  118. continue
  119. }
  120. } else if err != nil {
  121. log.Infof("Error during spot info decode: %+v", err)
  122. continue
  123. }
  124. log.Infof("Found price info %+v", p)
  125. key := strings.ToLower(p.InstanceID)
  126. if p.Region != "" { // strip the casing from region and add to key.
  127. key = fmt.Sprintf("%s,%s", strings.ToLower(p.Region), strings.ToLower(p.InstanceID))
  128. c.UsesRegion = true
  129. }
  130. if p.AssetClass == "pv" {
  131. pvpricing[key] = &p
  132. c.PVMapField = p.InstanceIDField
  133. } else if p.AssetClass == "node" {
  134. pricing[key] = &p
  135. classKey := p.Region + "," + p.InstanceType + "," + p.AssetClass
  136. cost, err := strconv.ParseFloat(p.MarketPriceHourly, 64)
  137. if err != nil {
  138. } else {
  139. if _, ok := nodeclasspricing[classKey]; ok {
  140. oldPrice := nodeclasspricing[classKey]
  141. oldCount := nodeclasscount[classKey]
  142. newPrice := ((oldPrice * oldCount) + cost) / (oldCount + 1.0)
  143. nodeclasscount[classKey] = newPrice
  144. nodeclasscount[classKey]++
  145. } else {
  146. nodeclasspricing[classKey] = cost
  147. nodeclasscount[classKey] = 1
  148. }
  149. }
  150. c.NodeMapField = p.InstanceIDField
  151. } else {
  152. log.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
  153. pricing[key] = &p
  154. c.NodeMapField = p.InstanceIDField
  155. }
  156. }
  157. if len(pricing) > 0 {
  158. c.Pricing = pricing
  159. c.NodeClassPricing = nodeclasspricing
  160. c.NodeClassCount = nodeclasscount
  161. c.PricingPV = pvpricing
  162. } else {
  163. log.DedupedWarningf(5, "No data received from csv at %s", c.CSVLocation)
  164. }
  165. time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
  166. return nil
  167. }
  168. type csvKey struct {
  169. Labels map[string]string
  170. ProviderID string
  171. }
  172. func (k *csvKey) Features() string {
  173. instanceType, _ := util.GetInstanceType(k.Labels)
  174. region, _ := util.GetRegion(k.Labels)
  175. class := "node"
  176. return region + "," + instanceType + "," + class
  177. }
  178. func (k *csvKey) GPUType() string {
  179. return ""
  180. }
  181. func (k *csvKey) ID() string {
  182. return k.ProviderID
  183. }
  184. func (c *CSVProvider) NodePricing(key Key) (*Node, error) {
  185. c.DownloadPricingDataLock.RLock()
  186. defer c.DownloadPricingDataLock.RUnlock()
  187. if p, ok := c.Pricing[key.ID()]; ok {
  188. return &Node{
  189. Cost: p.MarketPriceHourly,
  190. PricingType: CsvExact,
  191. }, nil
  192. }
  193. s := strings.Split(key.ID(), ",") // Try without a region to be sure
  194. if len(s) == 2 {
  195. if p, ok := c.Pricing[s[1]]; ok {
  196. return &Node{
  197. Cost: p.MarketPriceHourly,
  198. PricingType: CsvExact,
  199. }, nil
  200. }
  201. }
  202. classKey := key.Features() // Use node attributes to try and do a class match
  203. if cost, ok := c.NodeClassPricing[classKey]; ok {
  204. log.Infof("Unable to find provider ID `%s`, using features:`%s`", key.ID(), key.Features())
  205. return &Node{
  206. Cost: fmt.Sprintf("%f", cost),
  207. PricingType: CsvClass,
  208. }, nil
  209. }
  210. return nil, fmt.Errorf("Unable to find Node matching `%s`:`%s`", key.ID(), key.Features())
  211. }
  212. func NodeValueFromMapField(m string, n *v1.Node, useRegion bool) string {
  213. mf := strings.Split(m, ".")
  214. toReturn := ""
  215. if useRegion {
  216. if region, ok := util.GetRegion(n.Labels); ok {
  217. toReturn = region + ","
  218. } else {
  219. log.Errorf("Getting region based on labels failed")
  220. }
  221. }
  222. if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
  223. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  224. for matchNum, group := range provIdRx.FindStringSubmatch(n.Spec.ProviderID) {
  225. if matchNum == 2 {
  226. return toReturn + group
  227. }
  228. }
  229. if strings.HasPrefix(n.Spec.ProviderID, "azure://") {
  230. vmOrScaleSet := strings.ToLower(strings.TrimPrefix(n.Spec.ProviderID, "azure://"))
  231. return toReturn + vmOrScaleSet
  232. }
  233. return toReturn + n.Spec.ProviderID
  234. } else if len(mf) > 1 && mf[0] == "metadata" {
  235. if mf[1] == "name" {
  236. return toReturn + n.Name
  237. } else if mf[1] == "labels" {
  238. lkey := strings.Join(mf[2:len(mf)], "")
  239. return toReturn + n.Labels[lkey]
  240. } else if mf[1] == "annotations" {
  241. akey := strings.Join(mf[2:len(mf)], "")
  242. return toReturn + n.Annotations[akey]
  243. } else {
  244. log.Errorf("Unsupported InstanceIDField %s in CSV For Node", m)
  245. return ""
  246. }
  247. } else {
  248. log.Errorf("Unsupported InstanceIDField %s in CSV For Node", m)
  249. return ""
  250. }
  251. }
  252. func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
  253. mf := strings.Split(m, ".")
  254. if len(mf) > 1 && mf[0] == "metadata" {
  255. if mf[1] == "name" {
  256. return n.Name
  257. } else if mf[1] == "labels" {
  258. lkey := strings.Join(mf[2:len(mf)], "")
  259. return n.Labels[lkey]
  260. } else if mf[1] == "annotations" {
  261. akey := strings.Join(mf[2:len(mf)], "")
  262. return n.Annotations[akey]
  263. } else {
  264. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  265. return ""
  266. }
  267. } else if len(mf) > 2 && mf[0] == "spec" {
  268. if mf[1] == "capacity" && mf[2] == "storage" {
  269. skey := n.Spec.Capacity["storage"]
  270. return skey.String()
  271. } else {
  272. log.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  273. return ""
  274. }
  275. } else {
  276. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  277. return ""
  278. }
  279. }
  280. func (c *CSVProvider) GetKey(l map[string]string, n *v1.Node) Key {
  281. id := NodeValueFromMapField(c.NodeMapField, n, c.UsesRegion)
  282. return &csvKey{
  283. ProviderID: id,
  284. Labels: l,
  285. }
  286. }
  287. type csvPVKey struct {
  288. Labels map[string]string
  289. ProviderID string
  290. StorageClassName string
  291. StorageClassParameters map[string]string
  292. Name string
  293. DefaultRegion string
  294. }
  295. func (key *csvPVKey) ID() string {
  296. return ""
  297. }
  298. func (key *csvPVKey) GetStorageClass() string {
  299. return key.StorageClassName
  300. }
  301. func (key *csvPVKey) Features() string {
  302. return key.ProviderID
  303. }
  304. func (c *CSVProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  305. id := PVValueFromMapField(c.PVMapField, pv)
  306. return &csvPVKey{
  307. Labels: pv.Labels,
  308. ProviderID: id,
  309. StorageClassName: pv.Spec.StorageClassName,
  310. StorageClassParameters: parameters,
  311. Name: pv.Name,
  312. DefaultRegion: defaultRegion,
  313. }
  314. }
  315. func (c *CSVProvider) PVPricing(pvk PVKey) (*PV, error) {
  316. c.DownloadPricingDataLock.RLock()
  317. defer c.DownloadPricingDataLock.RUnlock()
  318. pricing, ok := c.PricingPV[pvk.Features()]
  319. if !ok {
  320. log.Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  321. return &PV{}, nil
  322. }
  323. return &PV{
  324. Cost: pricing.MarketPriceHourly,
  325. }, nil
  326. }
  327. func (c *CSVProvider) ServiceAccountStatus() *ServiceAccountStatus {
  328. return &ServiceAccountStatus{
  329. Checks: []*ServiceAccountCheck{},
  330. }
  331. }
  332. func (*CSVProvider) ClusterManagementPricing() (string, float64, error) {
  333. return "", 0.0, nil
  334. }
  335. func (c *CSVProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  336. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  337. }
  338. func (c *CSVProvider) ParseID(id string) string {
  339. return id
  340. }
  341. func (c *CSVProvider) ParsePVID(id string) string {
  342. return id
  343. }
  344. func (c *CSVProvider) ParseLBID(id string) string {
  345. return id
  346. }