2
0

csvprovider.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. package cloud
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "os"
  7. "regexp"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/kubecost/cost-model/pkg/env"
  12. "github.com/aws/aws-sdk-go/aws"
  13. "github.com/aws/aws-sdk-go/aws/session"
  14. "github.com/aws/aws-sdk-go/service/s3"
  15. "github.com/kubecost/cost-model/pkg/log"
  16. v1 "k8s.io/api/core/v1"
  17. "k8s.io/klog"
  18. "github.com/jszwec/csvutil"
  19. )
  20. const refreshMinutes = 60
  21. type CSVProvider struct {
  22. *CustomProvider
  23. CSVLocation string
  24. Pricing map[string]*price
  25. NodeMapField string
  26. PricingPV map[string]*price
  27. PVMapField string
  28. UsesRegion bool
  29. DownloadPricingDataLock sync.RWMutex
  30. }
  31. type price struct {
  32. EndTimestamp string `csv:"EndTimestamp"`
  33. InstanceID string `csv:"InstanceID"`
  34. Region string `csv:"Region"`
  35. AssetClass string `csv:"AssetClass"`
  36. InstanceIDField string `csv:"InstanceIDField"`
  37. InstanceType string `csv:"InstanceType"`
  38. MarketPriceHourly string `csv:"MarketPriceHourly"`
  39. Version string `csv:"Version"`
  40. }
  41. func GetCsv(location string) (io.Reader, error) {
  42. return os.Open(location)
  43. }
  44. func (c *CSVProvider) DownloadPricingData() error {
  45. c.DownloadPricingDataLock.Lock()
  46. defer c.DownloadPricingDataLock.Unlock()
  47. pricing := make(map[string]*price)
  48. pvpricing := make(map[string]*price)
  49. header, err := csvutil.Header(price{}, "csv")
  50. if err != nil {
  51. return err
  52. }
  53. fieldsPerRecord := len(header)
  54. var csvr io.Reader
  55. var csverr error
  56. if strings.HasPrefix(c.CSVLocation, "s3://") {
  57. region := env.GetCSVRegion()
  58. conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
  59. s3Client := s3.New(session.New(conf))
  60. bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
  61. if len(bucketAndKey) == 2 {
  62. out, err := s3Client.GetObject(&s3.GetObjectInput{
  63. Bucket: aws.String(bucketAndKey[0]),
  64. Key: aws.String(bucketAndKey[1]),
  65. })
  66. csverr = err
  67. csvr = out.Body
  68. } else {
  69. c.Pricing = pricing
  70. c.PricingPV = pvpricing
  71. return fmt.Errorf("Invalid s3 URI: %s", c.CSVLocation)
  72. }
  73. } else {
  74. csvr, csverr = GetCsv(c.CSVLocation)
  75. }
  76. if csverr != nil {
  77. klog.Infof("Error reading csv at %s: %s", c.CSVLocation, csverr)
  78. c.Pricing = pricing
  79. c.PricingPV = pvpricing
  80. return nil
  81. }
  82. csvReader := csv.NewReader(csvr)
  83. csvReader.Comma = ','
  84. csvReader.FieldsPerRecord = fieldsPerRecord
  85. dec, err := csvutil.NewDecoder(csvReader, header...)
  86. if err != nil {
  87. c.Pricing = pricing
  88. c.PricingPV = pvpricing
  89. return err
  90. }
  91. for {
  92. p := price{}
  93. err := dec.Decode(&p)
  94. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  95. if err == io.EOF {
  96. break
  97. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  98. rec := dec.Record()
  99. if len(rec) != 1 {
  100. klog.V(2).Infof("Expected %d price info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  101. continue
  102. }
  103. if strings.Index(rec[0], "#") == 0 {
  104. continue
  105. } else {
  106. klog.V(3).Infof("skipping non-CSV line: %s", rec)
  107. continue
  108. }
  109. } else if err != nil {
  110. klog.V(2).Infof("Error during spot info decode: %+v", err)
  111. continue
  112. }
  113. klog.V(4).Infof("Found price info %+v", p)
  114. key := strings.ToLower(p.InstanceID)
  115. if p.Region != "" { // strip the casing from region and add to key.
  116. key = fmt.Sprintf("%s,%s", strings.ToLower(p.Region), strings.ToLower(p.InstanceID))
  117. c.UsesRegion = true
  118. }
  119. if p.AssetClass == "pv" {
  120. pvpricing[key] = &p
  121. c.PVMapField = p.InstanceIDField
  122. } else if p.AssetClass == "node" {
  123. pricing[key] = &p
  124. c.NodeMapField = p.InstanceIDField
  125. } else {
  126. klog.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
  127. pricing[key] = &p
  128. c.NodeMapField = p.InstanceIDField
  129. }
  130. }
  131. if len(pricing) > 0 {
  132. c.Pricing = pricing
  133. c.PricingPV = pvpricing
  134. } else {
  135. log.DedupedWarningf(5, "No data received from csv at %s", c.CSVLocation)
  136. }
  137. time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
  138. return nil
  139. }
  140. type csvKey struct {
  141. Labels map[string]string
  142. ProviderID string
  143. }
  144. func (k *csvKey) Features() string {
  145. return ""
  146. }
  147. func (k *csvKey) GPUType() string {
  148. return ""
  149. }
  150. func (k *csvKey) ID() string {
  151. return k.ProviderID
  152. }
  153. func (c *CSVProvider) NodePricing(key Key) (*Node, error) {
  154. c.DownloadPricingDataLock.RLock()
  155. defer c.DownloadPricingDataLock.RUnlock()
  156. if p, ok := c.Pricing[key.ID()]; ok {
  157. return &Node{
  158. Cost: p.MarketPriceHourly,
  159. }, nil
  160. }
  161. s := strings.Split(key.ID(), ",") // Try without a region to be sure
  162. if len(s) == 2 {
  163. if p, ok := c.Pricing[s[1]]; ok {
  164. return &Node{
  165. Cost: p.MarketPriceHourly,
  166. }, nil
  167. }
  168. }
  169. return nil, fmt.Errorf("Unable to find Node matching %s", key.ID())
  170. }
  171. func NodeValueFromMapField(m string, n *v1.Node, useRegion bool) string {
  172. mf := strings.Split(m, ".")
  173. toReturn := ""
  174. if useRegion {
  175. toReturn = n.Labels[v1.LabelZoneRegion] + ","
  176. }
  177. if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
  178. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  179. for matchNum, group := range provIdRx.FindStringSubmatch(n.Spec.ProviderID) {
  180. if matchNum == 2 {
  181. return toReturn + group
  182. }
  183. }
  184. if strings.HasPrefix(n.Spec.ProviderID, "azure://") {
  185. vmOrScaleSet := strings.ToLower(strings.TrimPrefix(n.Spec.ProviderID, "azure://"))
  186. return toReturn + vmOrScaleSet
  187. }
  188. return toReturn + n.Spec.ProviderID
  189. } else if len(mf) > 1 && mf[0] == "metadata" {
  190. if mf[1] == "name" {
  191. return toReturn + n.Name
  192. } else if mf[1] == "labels" {
  193. lkey := strings.Join(mf[2:len(mf)], "")
  194. return toReturn + n.Labels[lkey]
  195. } else if mf[1] == "annotations" {
  196. akey := strings.Join(mf[2:len(mf)], "")
  197. return toReturn + n.Annotations[akey]
  198. } else {
  199. klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For Node", m)
  200. return ""
  201. }
  202. } else {
  203. klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For Node", m)
  204. return ""
  205. }
  206. }
  207. func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
  208. mf := strings.Split(m, ".")
  209. if len(mf) > 1 && mf[0] == "metadata" {
  210. if mf[1] == "name" {
  211. return n.Name
  212. } else if mf[1] == "labels" {
  213. lkey := strings.Join(mf[2:len(mf)], "")
  214. return n.Labels[lkey]
  215. } else if mf[1] == "annotations" {
  216. akey := strings.Join(mf[2:len(mf)], "")
  217. return n.Annotations[akey]
  218. } else {
  219. klog.V(4).Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  220. return ""
  221. }
  222. } else {
  223. klog.V(4).Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  224. return ""
  225. }
  226. }
  227. func (c *CSVProvider) GetKey(l map[string]string, n *v1.Node) Key {
  228. id := NodeValueFromMapField(c.NodeMapField, n, c.UsesRegion)
  229. return &csvKey{
  230. ProviderID: id,
  231. Labels: l,
  232. }
  233. }
  234. type csvPVKey struct {
  235. Labels map[string]string
  236. ProviderID string
  237. StorageClassName string
  238. StorageClassParameters map[string]string
  239. Name string
  240. DefaultRegion string
  241. }
  242. func (key *csvPVKey) ID() string {
  243. return ""
  244. }
  245. func (key *csvPVKey) GetStorageClass() string {
  246. return key.StorageClassName
  247. }
  248. func (key *csvPVKey) Features() string {
  249. return key.ProviderID
  250. }
  251. func (c *CSVProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  252. id := PVValueFromMapField(c.PVMapField, pv)
  253. return &csvPVKey{
  254. Labels: pv.Labels,
  255. ProviderID: id,
  256. StorageClassName: pv.Spec.StorageClassName,
  257. StorageClassParameters: parameters,
  258. Name: pv.Name,
  259. DefaultRegion: defaultRegion,
  260. }
  261. }
  262. func (c *CSVProvider) PVPricing(pvk PVKey) (*PV, error) {
  263. c.DownloadPricingDataLock.RLock()
  264. defer c.DownloadPricingDataLock.RUnlock()
  265. pricing, ok := c.PricingPV[pvk.Features()]
  266. if !ok {
  267. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  268. return &PV{}, nil
  269. }
  270. return &PV{
  271. Cost: pricing.MarketPriceHourly,
  272. }, nil
  273. }
  274. func (c *CSVProvider) ServiceAccountStatus() *ServiceAccountStatus {
  275. return &ServiceAccountStatus{
  276. Checks: []*ServiceAccountCheck{},
  277. }
  278. }
  279. func (*CSVProvider) ClusterManagementPricing() (string, float64, error) {
  280. return "", 0.0, nil
  281. }
  282. func (c *CSVProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  283. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  284. }
  285. func (c *CSVProvider) ParseID(id string) string {
  286. return id
  287. }
  288. func (c *CSVProvider) ParsePVID(id string) string {
  289. return id
  290. }