csvprovider.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. package cloud
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "os"
  7. "regexp"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/kubecost/cost-model/pkg/env"
  13. "github.com/kubecost/cost-model/pkg/util"
  14. "github.com/aws/aws-sdk-go/aws"
  15. "github.com/aws/aws-sdk-go/aws/session"
  16. "github.com/aws/aws-sdk-go/service/s3"
  17. "github.com/kubecost/cost-model/pkg/log"
  18. v1 "k8s.io/api/core/v1"
  19. "github.com/jszwec/csvutil"
  20. )
  21. const refreshMinutes = 60
  22. type CSVProvider struct {
  23. *CustomProvider
  24. CSVLocation string
  25. Pricing map[string]*price
  26. NodeClassPricing map[string]float64
  27. NodeClassCount map[string]float64
  28. NodeMapField string
  29. PricingPV map[string]*price
  30. PVMapField string
  31. UsesRegion bool
  32. DownloadPricingDataLock sync.RWMutex
  33. }
  34. type price struct {
  35. EndTimestamp string `csv:"EndTimestamp"`
  36. InstanceID string `csv:"InstanceID"`
  37. Region string `csv:"Region"`
  38. AssetClass string `csv:"AssetClass"`
  39. InstanceIDField string `csv:"InstanceIDField"`
  40. InstanceType string `csv:"InstanceType"`
  41. MarketPriceHourly string `csv:"MarketPriceHourly"`
  42. Version string `csv:"Version"`
  43. }
  44. func GetCsv(location string) (io.Reader, error) {
  45. return os.Open(location)
  46. }
  47. func (c *CSVProvider) DownloadPricingData() error {
  48. c.DownloadPricingDataLock.Lock()
  49. defer time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
  50. defer c.DownloadPricingDataLock.Unlock()
  51. pricing := make(map[string]*price)
  52. nodeclasspricing := make(map[string]float64)
  53. nodeclasscount := make(map[string]float64)
  54. pvpricing := make(map[string]*price)
  55. header, err := csvutil.Header(price{}, "csv")
  56. if err != nil {
  57. return err
  58. }
  59. fieldsPerRecord := len(header)
  60. var csvr io.Reader
  61. var csverr error
  62. if strings.HasPrefix(c.CSVLocation, "s3://") {
  63. region := env.GetCSVRegion()
  64. conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
  65. endpoint := env.GetCSVEndpoint()
  66. if endpoint != "" {
  67. conf = conf.WithEndpoint(endpoint)
  68. }
  69. s3Client := s3.New(session.New(conf))
  70. bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
  71. if len(bucketAndKey) == 2 {
  72. out, err := s3Client.GetObject(&s3.GetObjectInput{
  73. Bucket: aws.String(bucketAndKey[0]),
  74. Key: aws.String(bucketAndKey[1]),
  75. })
  76. csverr = err
  77. csvr = out.Body
  78. } else {
  79. c.Pricing = pricing
  80. c.NodeClassPricing = nodeclasspricing
  81. c.NodeClassCount = nodeclasscount
  82. c.PricingPV = pvpricing
  83. return fmt.Errorf("Invalid s3 URI: %s", c.CSVLocation)
  84. }
  85. } else {
  86. csvr, csverr = GetCsv(c.CSVLocation)
  87. }
  88. if csverr != nil {
  89. log.Infof("Error reading csv at %s: %s", c.CSVLocation, csverr)
  90. c.Pricing = pricing
  91. c.NodeClassPricing = nodeclasspricing
  92. c.NodeClassCount = nodeclasscount
  93. c.PricingPV = pvpricing
  94. return nil
  95. }
  96. csvReader := csv.NewReader(csvr)
  97. csvReader.Comma = ','
  98. csvReader.FieldsPerRecord = fieldsPerRecord
  99. dec, err := csvutil.NewDecoder(csvReader, header...)
  100. if err != nil {
  101. c.Pricing = pricing
  102. c.NodeClassPricing = nodeclasspricing
  103. c.NodeClassCount = nodeclasscount
  104. c.PricingPV = pvpricing
  105. return err
  106. }
  107. for {
  108. p := price{}
  109. err := dec.Decode(&p)
  110. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  111. if err == io.EOF {
  112. break
  113. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  114. rec := dec.Record()
  115. if len(rec) != 1 {
  116. log.Infof("Expected %d price info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  117. continue
  118. }
  119. if strings.Index(rec[0], "#") == 0 {
  120. continue
  121. } else {
  122. log.Infof("skipping non-CSV line: %s", rec)
  123. continue
  124. }
  125. } else if err != nil {
  126. log.Infof("Error during spot info decode: %+v", err)
  127. continue
  128. }
  129. log.Infof("Found price info %+v", p)
  130. key := strings.ToLower(p.InstanceID)
  131. if p.Region != "" { // strip the casing from region and add to key.
  132. key = fmt.Sprintf("%s,%s", strings.ToLower(p.Region), strings.ToLower(p.InstanceID))
  133. c.UsesRegion = true
  134. }
  135. if p.AssetClass == "pv" {
  136. pvpricing[key] = &p
  137. c.PVMapField = p.InstanceIDField
  138. } else if p.AssetClass == "node" {
  139. pricing[key] = &p
  140. classKey := p.Region + "," + p.InstanceType + "," + p.AssetClass
  141. cost, err := strconv.ParseFloat(p.MarketPriceHourly, 64)
  142. if err != nil {
  143. } else {
  144. if _, ok := nodeclasspricing[classKey]; ok {
  145. oldPrice := nodeclasspricing[classKey]
  146. oldCount := nodeclasscount[classKey]
  147. newPrice := ((oldPrice * oldCount) + cost) / (oldCount + 1.0)
  148. nodeclasscount[classKey] = newPrice
  149. nodeclasscount[classKey]++
  150. } else {
  151. nodeclasspricing[classKey] = cost
  152. nodeclasscount[classKey] = 1
  153. }
  154. }
  155. c.NodeMapField = p.InstanceIDField
  156. } else {
  157. log.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
  158. pricing[key] = &p
  159. c.NodeMapField = p.InstanceIDField
  160. }
  161. }
  162. if len(pricing) > 0 {
  163. c.Pricing = pricing
  164. c.NodeClassPricing = nodeclasspricing
  165. c.NodeClassCount = nodeclasscount
  166. c.PricingPV = pvpricing
  167. } else {
  168. log.DedupedWarningf(5, "No data received from csv at %s", c.CSVLocation)
  169. }
  170. return nil
  171. }
  172. type csvKey struct {
  173. Labels map[string]string
  174. ProviderID string
  175. }
  176. func (k *csvKey) Features() string {
  177. instanceType, _ := util.GetInstanceType(k.Labels)
  178. region, _ := util.GetRegion(k.Labels)
  179. class := "node"
  180. return region + "," + instanceType + "," + class
  181. }
  182. func (k *csvKey) GPUType() string {
  183. return ""
  184. }
  185. func (k *csvKey) ID() string {
  186. return k.ProviderID
  187. }
  188. func (c *CSVProvider) NodePricing(key Key) (*Node, error) {
  189. c.DownloadPricingDataLock.RLock()
  190. defer c.DownloadPricingDataLock.RUnlock()
  191. if p, ok := c.Pricing[key.ID()]; ok {
  192. return &Node{
  193. Cost: p.MarketPriceHourly,
  194. PricingType: CsvExact,
  195. }, nil
  196. }
  197. s := strings.Split(key.ID(), ",") // Try without a region to be sure
  198. if len(s) == 2 {
  199. if p, ok := c.Pricing[s[1]]; ok {
  200. return &Node{
  201. Cost: p.MarketPriceHourly,
  202. PricingType: CsvExact,
  203. }, nil
  204. }
  205. }
  206. classKey := key.Features() // Use node attributes to try and do a class match
  207. if cost, ok := c.NodeClassPricing[classKey]; ok {
  208. log.Infof("Unable to find provider ID `%s`, using features:`%s`", key.ID(), key.Features())
  209. return &Node{
  210. Cost: fmt.Sprintf("%f", cost),
  211. PricingType: CsvClass,
  212. }, nil
  213. }
  214. return nil, fmt.Errorf("Unable to find Node matching `%s`:`%s`", key.ID(), key.Features())
  215. }
  216. func NodeValueFromMapField(m string, n *v1.Node, useRegion bool) string {
  217. mf := strings.Split(m, ".")
  218. toReturn := ""
  219. if useRegion {
  220. if region, ok := util.GetRegion(n.Labels); ok {
  221. toReturn = region + ","
  222. } else {
  223. log.Errorf("Getting region based on labels failed")
  224. }
  225. }
  226. if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
  227. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  228. for matchNum, group := range provIdRx.FindStringSubmatch(n.Spec.ProviderID) {
  229. if matchNum == 2 {
  230. return toReturn + group
  231. }
  232. }
  233. if strings.HasPrefix(n.Spec.ProviderID, "azure://") {
  234. vmOrScaleSet := strings.ToLower(strings.TrimPrefix(n.Spec.ProviderID, "azure://"))
  235. return toReturn + vmOrScaleSet
  236. }
  237. return toReturn + n.Spec.ProviderID
  238. } else if len(mf) > 1 && mf[0] == "metadata" {
  239. if mf[1] == "name" {
  240. return toReturn + n.Name
  241. } else if mf[1] == "labels" {
  242. lkey := strings.Join(mf[2:len(mf)], "")
  243. return toReturn + n.Labels[lkey]
  244. } else if mf[1] == "annotations" {
  245. akey := strings.Join(mf[2:len(mf)], "")
  246. return toReturn + n.Annotations[akey]
  247. } else {
  248. log.Errorf("Unsupported InstanceIDField %s in CSV For Node", m)
  249. return ""
  250. }
  251. } else {
  252. log.Errorf("Unsupported InstanceIDField %s in CSV For Node", m)
  253. return ""
  254. }
  255. }
  256. func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
  257. mf := strings.Split(m, ".")
  258. if len(mf) > 1 && mf[0] == "metadata" {
  259. if mf[1] == "name" {
  260. return n.Name
  261. } else if mf[1] == "labels" {
  262. lkey := strings.Join(mf[2:len(mf)], "")
  263. return n.Labels[lkey]
  264. } else if mf[1] == "annotations" {
  265. akey := strings.Join(mf[2:len(mf)], "")
  266. return n.Annotations[akey]
  267. } else {
  268. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  269. return ""
  270. }
  271. } else if len(mf) > 2 && mf[0] == "spec" {
  272. if mf[1] == "capacity" && mf[2] == "storage" {
  273. skey := n.Spec.Capacity["storage"]
  274. return skey.String()
  275. } else {
  276. log.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  277. return ""
  278. }
  279. } else {
  280. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  281. return ""
  282. }
  283. }
  284. func (c *CSVProvider) GetKey(l map[string]string, n *v1.Node) Key {
  285. id := NodeValueFromMapField(c.NodeMapField, n, c.UsesRegion)
  286. return &csvKey{
  287. ProviderID: id,
  288. Labels: l,
  289. }
  290. }
  291. type csvPVKey struct {
  292. Labels map[string]string
  293. ProviderID string
  294. StorageClassName string
  295. StorageClassParameters map[string]string
  296. Name string
  297. DefaultRegion string
  298. }
  299. func (key *csvPVKey) ID() string {
  300. return ""
  301. }
  302. func (key *csvPVKey) GetStorageClass() string {
  303. return key.StorageClassName
  304. }
  305. func (key *csvPVKey) Features() string {
  306. return key.ProviderID
  307. }
  308. func (c *CSVProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  309. id := PVValueFromMapField(c.PVMapField, pv)
  310. return &csvPVKey{
  311. Labels: pv.Labels,
  312. ProviderID: id,
  313. StorageClassName: pv.Spec.StorageClassName,
  314. StorageClassParameters: parameters,
  315. Name: pv.Name,
  316. DefaultRegion: defaultRegion,
  317. }
  318. }
  319. func (c *CSVProvider) PVPricing(pvk PVKey) (*PV, error) {
  320. c.DownloadPricingDataLock.RLock()
  321. defer c.DownloadPricingDataLock.RUnlock()
  322. pricing, ok := c.PricingPV[pvk.Features()]
  323. if !ok {
  324. log.Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  325. return &PV{}, nil
  326. }
  327. return &PV{
  328. Cost: pricing.MarketPriceHourly,
  329. }, nil
  330. }
  331. func (c *CSVProvider) ServiceAccountStatus() *ServiceAccountStatus {
  332. return &ServiceAccountStatus{
  333. Checks: []*ServiceAccountCheck{},
  334. }
  335. }
  336. func (*CSVProvider) ClusterManagementPricing() (string, float64, error) {
  337. return "", 0.0, nil
  338. }
  339. func (c *CSVProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  340. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  341. }
  342. func (c *CSVProvider) Regions() []string {
  343. return []string{}
  344. }