csvprovider.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. package cloud
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "os"
  7. "regexp"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/kubecost/cost-model/pkg/env"
  13. "github.com/kubecost/cost-model/pkg/util"
  14. "github.com/aws/aws-sdk-go/aws"
  15. "github.com/aws/aws-sdk-go/aws/session"
  16. "github.com/aws/aws-sdk-go/service/s3"
  17. "github.com/kubecost/cost-model/pkg/log"
  18. v1 "k8s.io/api/core/v1"
  19. "k8s.io/klog"
  20. "github.com/jszwec/csvutil"
  21. )
  22. const refreshMinutes = 60
  23. type CSVProvider struct {
  24. *CustomProvider
  25. CSVLocation string
  26. Pricing map[string]*price
  27. NodeClassPricing map[string]float64
  28. NodeClassCount map[string]float64
  29. NodeMapField string
  30. PricingPV map[string]*price
  31. PVMapField string
  32. UsesRegion bool
  33. DownloadPricingDataLock sync.RWMutex
  34. }
  35. type price struct {
  36. EndTimestamp string `csv:"EndTimestamp"`
  37. InstanceID string `csv:"InstanceID"`
  38. Region string `csv:"Region"`
  39. AssetClass string `csv:"AssetClass"`
  40. InstanceIDField string `csv:"InstanceIDField"`
  41. InstanceType string `csv:"InstanceType"`
  42. MarketPriceHourly string `csv:"MarketPriceHourly"`
  43. Version string `csv:"Version"`
  44. }
  45. func GetCsv(location string) (io.Reader, error) {
  46. return os.Open(location)
  47. }
  48. func (c *CSVProvider) DownloadPricingData() error {
  49. c.DownloadPricingDataLock.Lock()
  50. defer c.DownloadPricingDataLock.Unlock()
  51. pricing := make(map[string]*price)
  52. nodeclasspricing := make(map[string]float64)
  53. nodeclasscount := make(map[string]float64)
  54. pvpricing := make(map[string]*price)
  55. header, err := csvutil.Header(price{}, "csv")
  56. if err != nil {
  57. return err
  58. }
  59. fieldsPerRecord := len(header)
  60. var csvr io.Reader
  61. var csverr error
  62. if strings.HasPrefix(c.CSVLocation, "s3://") {
  63. region := env.GetCSVRegion()
  64. conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
  65. s3Client := s3.New(session.New(conf))
  66. bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
  67. if len(bucketAndKey) == 2 {
  68. out, err := s3Client.GetObject(&s3.GetObjectInput{
  69. Bucket: aws.String(bucketAndKey[0]),
  70. Key: aws.String(bucketAndKey[1]),
  71. })
  72. csverr = err
  73. csvr = out.Body
  74. } else {
  75. c.Pricing = pricing
  76. c.NodeClassPricing = nodeclasspricing
  77. c.NodeClassCount = nodeclasscount
  78. c.PricingPV = pvpricing
  79. return fmt.Errorf("Invalid s3 URI: %s", c.CSVLocation)
  80. }
  81. } else {
  82. csvr, csverr = GetCsv(c.CSVLocation)
  83. }
  84. if csverr != nil {
  85. klog.Infof("Error reading csv at %s: %s", c.CSVLocation, csverr)
  86. c.Pricing = pricing
  87. c.NodeClassPricing = nodeclasspricing
  88. c.NodeClassCount = nodeclasscount
  89. c.PricingPV = pvpricing
  90. return nil
  91. }
  92. csvReader := csv.NewReader(csvr)
  93. csvReader.Comma = ','
  94. csvReader.FieldsPerRecord = fieldsPerRecord
  95. dec, err := csvutil.NewDecoder(csvReader, header...)
  96. if err != nil {
  97. c.Pricing = pricing
  98. c.NodeClassPricing = nodeclasspricing
  99. c.NodeClassCount = nodeclasscount
  100. c.PricingPV = pvpricing
  101. return err
  102. }
  103. for {
  104. p := price{}
  105. err := dec.Decode(&p)
  106. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  107. if err == io.EOF {
  108. break
  109. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  110. rec := dec.Record()
  111. if len(rec) != 1 {
  112. klog.V(2).Infof("Expected %d price info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  113. continue
  114. }
  115. if strings.Index(rec[0], "#") == 0 {
  116. continue
  117. } else {
  118. klog.V(3).Infof("skipping non-CSV line: %s", rec)
  119. continue
  120. }
  121. } else if err != nil {
  122. klog.V(2).Infof("Error during spot info decode: %+v", err)
  123. continue
  124. }
  125. klog.V(4).Infof("Found price info %+v", p)
  126. key := strings.ToLower(p.InstanceID)
  127. if p.Region != "" { // strip the casing from region and add to key.
  128. key = fmt.Sprintf("%s,%s", strings.ToLower(p.Region), strings.ToLower(p.InstanceID))
  129. c.UsesRegion = true
  130. }
  131. if p.AssetClass == "pv" {
  132. pvpricing[key] = &p
  133. c.PVMapField = p.InstanceIDField
  134. } else if p.AssetClass == "node" {
  135. pricing[key] = &p
  136. classKey := p.Region + "," + p.InstanceType + "," + p.AssetClass
  137. cost, err := strconv.ParseFloat(p.MarketPriceHourly, 64)
  138. if err != nil {
  139. } else {
  140. if _, ok := nodeclasspricing[classKey]; ok {
  141. oldPrice := nodeclasspricing[classKey]
  142. oldCount := nodeclasscount[classKey]
  143. newPrice := ((oldPrice * oldCount) + cost) / (oldCount + 1.0)
  144. nodeclasscount[classKey] = newPrice
  145. nodeclasscount[classKey]++
  146. } else {
  147. nodeclasspricing[classKey] = cost
  148. nodeclasscount[classKey] = 1
  149. }
  150. }
  151. c.NodeMapField = p.InstanceIDField
  152. } else {
  153. klog.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
  154. pricing[key] = &p
  155. c.NodeMapField = p.InstanceIDField
  156. }
  157. }
  158. if len(pricing) > 0 {
  159. c.Pricing = pricing
  160. c.NodeClassPricing = nodeclasspricing
  161. c.NodeClassCount = nodeclasscount
  162. c.PricingPV = pvpricing
  163. } else {
  164. log.DedupedWarningf(5, "No data received from csv at %s", c.CSVLocation)
  165. }
  166. time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
  167. return nil
  168. }
  169. type csvKey struct {
  170. Labels map[string]string
  171. ProviderID string
  172. }
  173. func (k *csvKey) Features() string {
  174. instanceType, _ := util.GetInstanceType(k.Labels)
  175. region, _ := util.GetRegion(k.Labels)
  176. class := "node"
  177. return region + "," + instanceType + "," + class
  178. }
  179. func (k *csvKey) GPUType() string {
  180. return ""
  181. }
  182. func (k *csvKey) ID() string {
  183. return k.ProviderID
  184. }
  185. func (c *CSVProvider) NodePricing(key Key) (*Node, error) {
  186. c.DownloadPricingDataLock.RLock()
  187. defer c.DownloadPricingDataLock.RUnlock()
  188. if p, ok := c.Pricing[key.ID()]; ok {
  189. return &Node{
  190. Cost: p.MarketPriceHourly,
  191. PricingType: CsvExact,
  192. }, nil
  193. }
  194. s := strings.Split(key.ID(), ",") // Try without a region to be sure
  195. if len(s) == 2 {
  196. if p, ok := c.Pricing[s[1]]; ok {
  197. return &Node{
  198. Cost: p.MarketPriceHourly,
  199. PricingType: CsvExact,
  200. }, nil
  201. }
  202. }
  203. classKey := key.Features() // Use node attributes to try and do a class match
  204. if cost, ok := c.NodeClassPricing[classKey]; ok {
  205. klog.Infof("Unable to find provider ID `%s`, using features:`%s`", key.ID(), key.Features())
  206. return &Node{
  207. Cost: fmt.Sprintf("%f", cost),
  208. PricingType: CsvClass,
  209. }, nil
  210. }
  211. return nil, fmt.Errorf("Unable to find Node matching `%s`:`%s`", key.ID(), key.Features())
  212. }
  213. func NodeValueFromMapField(m string, n *v1.Node, useRegion bool) string {
  214. mf := strings.Split(m, ".")
  215. toReturn := ""
  216. if useRegion {
  217. toReturn = n.Labels[v1.LabelZoneRegion] + ","
  218. }
  219. if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
  220. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  221. for matchNum, group := range provIdRx.FindStringSubmatch(n.Spec.ProviderID) {
  222. if matchNum == 2 {
  223. return toReturn + group
  224. }
  225. }
  226. if strings.HasPrefix(n.Spec.ProviderID, "azure://") {
  227. vmOrScaleSet := strings.ToLower(strings.TrimPrefix(n.Spec.ProviderID, "azure://"))
  228. return toReturn + vmOrScaleSet
  229. }
  230. return toReturn + n.Spec.ProviderID
  231. } else if len(mf) > 1 && mf[0] == "metadata" {
  232. if mf[1] == "name" {
  233. return toReturn + n.Name
  234. } else if mf[1] == "labels" {
  235. lkey := strings.Join(mf[2:len(mf)], "")
  236. return toReturn + n.Labels[lkey]
  237. } else if mf[1] == "annotations" {
  238. akey := strings.Join(mf[2:len(mf)], "")
  239. return toReturn + n.Annotations[akey]
  240. } else {
  241. klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For Node", m)
  242. return ""
  243. }
  244. } else {
  245. klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For Node", m)
  246. return ""
  247. }
  248. }
  249. func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
  250. mf := strings.Split(m, ".")
  251. if len(mf) > 1 && mf[0] == "metadata" {
  252. if mf[1] == "name" {
  253. return n.Name
  254. } else if mf[1] == "labels" {
  255. lkey := strings.Join(mf[2:len(mf)], "")
  256. return n.Labels[lkey]
  257. } else if mf[1] == "annotations" {
  258. akey := strings.Join(mf[2:len(mf)], "")
  259. return n.Annotations[akey]
  260. } else {
  261. klog.V(4).Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  262. return ""
  263. }
  264. } else {
  265. klog.V(4).Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  266. return ""
  267. }
  268. }
  269. func (c *CSVProvider) GetKey(l map[string]string, n *v1.Node) Key {
  270. id := NodeValueFromMapField(c.NodeMapField, n, c.UsesRegion)
  271. return &csvKey{
  272. ProviderID: id,
  273. Labels: l,
  274. }
  275. }
  276. type csvPVKey struct {
  277. Labels map[string]string
  278. ProviderID string
  279. StorageClassName string
  280. StorageClassParameters map[string]string
  281. Name string
  282. DefaultRegion string
  283. }
  284. func (key *csvPVKey) ID() string {
  285. return ""
  286. }
  287. func (key *csvPVKey) GetStorageClass() string {
  288. return key.StorageClassName
  289. }
  290. func (key *csvPVKey) Features() string {
  291. return key.ProviderID
  292. }
  293. func (c *CSVProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  294. id := PVValueFromMapField(c.PVMapField, pv)
  295. return &csvPVKey{
  296. Labels: pv.Labels,
  297. ProviderID: id,
  298. StorageClassName: pv.Spec.StorageClassName,
  299. StorageClassParameters: parameters,
  300. Name: pv.Name,
  301. DefaultRegion: defaultRegion,
  302. }
  303. }
  304. func (c *CSVProvider) PVPricing(pvk PVKey) (*PV, error) {
  305. c.DownloadPricingDataLock.RLock()
  306. defer c.DownloadPricingDataLock.RUnlock()
  307. pricing, ok := c.PricingPV[pvk.Features()]
  308. if !ok {
  309. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  310. return &PV{}, nil
  311. }
  312. return &PV{
  313. Cost: pricing.MarketPriceHourly,
  314. }, nil
  315. }
  316. func (c *CSVProvider) ServiceAccountStatus() *ServiceAccountStatus {
  317. return &ServiceAccountStatus{
  318. Checks: []*ServiceAccountCheck{},
  319. }
  320. }
  321. func (*CSVProvider) ClusterManagementPricing() (string, float64, error) {
  322. return "", 0.0, nil
  323. }
  324. func (c *CSVProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  325. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  326. }
  327. func (c *CSVProvider) ParseID(id string) string {
  328. return id
  329. }
  330. func (c *CSVProvider) ParsePVID(id string) string {
  331. return id
  332. }