csvprovider.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package cloud
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "os"
  7. "regexp"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/aws/aws-sdk-go/aws"
  12. "github.com/aws/aws-sdk-go/aws/session"
  13. "github.com/aws/aws-sdk-go/service/s3"
  14. "github.com/kubecost/cost-model/pkg/log"
  15. v1 "k8s.io/api/core/v1"
  16. "k8s.io/klog"
  17. "github.com/jszwec/csvutil"
  18. )
  19. const refreshMinutes = 60
  20. type CSVProvider struct {
  21. *CustomProvider
  22. CSVLocation string
  23. Pricing map[string]*price
  24. NodeMapField string
  25. PricingPV map[string]*price
  26. PVMapField string
  27. UsesRegion bool
  28. DownloadPricingDataLock sync.RWMutex
  29. }
  30. type price struct {
  31. EndTimestamp string `csv:"EndTimestamp"`
  32. InstanceID string `csv:"InstanceID"`
  33. Region string `csv:"Region"`
  34. AssetClass string `csv:"AssetClass"`
  35. InstanceIDField string `csv:"InstanceIDField"`
  36. InstanceType string `csv:"InstanceType"`
  37. MarketPriceHourly string `csv:"MarketPriceHourly"`
  38. Version string `csv:"Version"`
  39. }
  40. func GetCsv(location string) (io.Reader, error) {
  41. return os.Open(location)
  42. }
  43. func (c *CSVProvider) DownloadPricingData() error {
  44. c.DownloadPricingDataLock.Lock()
  45. defer c.DownloadPricingDataLock.Unlock()
  46. pricing := make(map[string]*price)
  47. pvpricing := make(map[string]*price)
  48. header, err := csvutil.Header(price{}, "csv")
  49. if err != nil {
  50. return err
  51. }
  52. fieldsPerRecord := len(header)
  53. var csvr io.Reader
  54. var csverr error
  55. if strings.HasPrefix(c.CSVLocation, "s3://") {
  56. region := os.Getenv("CSV_REGION")
  57. conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
  58. s3Client := s3.New(session.New(conf))
  59. bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
  60. if len(bucketAndKey) == 2 {
  61. out, err := s3Client.GetObject(&s3.GetObjectInput{
  62. Bucket: aws.String(bucketAndKey[0]),
  63. Key: aws.String(bucketAndKey[1]),
  64. })
  65. csverr = err
  66. csvr = out.Body
  67. } else {
  68. c.Pricing = pricing
  69. c.PricingPV = pvpricing
  70. return fmt.Errorf("Invalid s3 URI: %s", c.CSVLocation)
  71. }
  72. } else {
  73. csvr, csverr = GetCsv(c.CSVLocation)
  74. }
  75. if csverr != nil {
  76. klog.Infof("Error reading csv at %s: %s", c.CSVLocation, csverr)
  77. c.Pricing = pricing
  78. c.PricingPV = pvpricing
  79. return nil
  80. }
  81. csvReader := csv.NewReader(csvr)
  82. csvReader.Comma = ','
  83. csvReader.FieldsPerRecord = fieldsPerRecord
  84. dec, err := csvutil.NewDecoder(csvReader, header...)
  85. if err != nil {
  86. c.Pricing = pricing
  87. c.PricingPV = pvpricing
  88. return err
  89. }
  90. for {
  91. p := price{}
  92. err := dec.Decode(&p)
  93. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  94. if err == io.EOF {
  95. break
  96. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  97. rec := dec.Record()
  98. if len(rec) != 1 {
  99. klog.V(2).Infof("Expected %d price info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  100. continue
  101. }
  102. if strings.Index(rec[0], "#") == 0 {
  103. continue
  104. } else {
  105. klog.V(3).Infof("skipping non-CSV line: %s", rec)
  106. continue
  107. }
  108. } else if err != nil {
  109. klog.V(2).Infof("Error during spot info decode: %+v", err)
  110. continue
  111. }
  112. klog.V(4).Infof("Found price info %+v", p)
  113. key := p.InstanceID
  114. if p.Region != "" { // strip the casing from region and add to key.
  115. key = fmt.Sprintf("%s,%s", strings.ToLower(p.Region), p.InstanceID)
  116. c.UsesRegion = true
  117. }
  118. if p.AssetClass == "pv" {
  119. pvpricing[key] = &p
  120. c.PVMapField = p.InstanceIDField
  121. } else if p.AssetClass == "node" {
  122. pricing[key] = &p
  123. c.NodeMapField = p.InstanceIDField
  124. } else {
  125. klog.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
  126. pricing[key] = &p
  127. c.NodeMapField = p.InstanceIDField
  128. }
  129. }
  130. if len(pricing) > 0 {
  131. c.Pricing = pricing
  132. c.PricingPV = pvpricing
  133. } else {
  134. log.DedupedWarningf("No data received from csv at %s", 5, c.CSVLocation)
  135. }
  136. time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
  137. return nil
  138. }
  139. type csvKey struct {
  140. Labels map[string]string
  141. ProviderID string
  142. }
  143. func (k *csvKey) Features() string {
  144. return ""
  145. }
  146. func (k *csvKey) GPUType() string {
  147. return ""
  148. }
  149. func (k *csvKey) ID() string {
  150. return k.ProviderID
  151. }
  152. func (c *CSVProvider) NodePricing(key Key) (*Node, error) {
  153. c.DownloadPricingDataLock.RLock()
  154. defer c.DownloadPricingDataLock.RUnlock()
  155. if p, ok := c.Pricing[key.ID()]; ok {
  156. return &Node{
  157. Cost: p.MarketPriceHourly,
  158. }, nil
  159. }
  160. s := strings.Split(key.ID(), ",") // Try without a region to be sure
  161. if len(s) == 2 {
  162. if p, ok := c.Pricing[s[1]]; ok {
  163. return &Node{
  164. Cost: p.MarketPriceHourly,
  165. }, nil
  166. }
  167. }
  168. return nil, fmt.Errorf("Unable to find Node matching %s", key.ID())
  169. }
  170. func NodeValueFromMapField(m string, n *v1.Node, useRegion bool) string {
  171. mf := strings.Split(m, ".")
  172. toReturn := ""
  173. if useRegion {
  174. toReturn = n.Labels[v1.LabelZoneRegion] + ","
  175. }
  176. if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
  177. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  178. for matchNum, group := range provIdRx.FindStringSubmatch(n.Spec.ProviderID) {
  179. if matchNum == 2 {
  180. return toReturn + group
  181. }
  182. }
  183. if strings.HasPrefix(n.Spec.ProviderID, "azure://") {
  184. vmOrScaleSet := strings.TrimPrefix(n.Spec.ProviderID, "azure://")
  185. return toReturn + vmOrScaleSet
  186. }
  187. return toReturn + n.Spec.ProviderID
  188. } else if len(mf) > 1 && mf[0] == "metadata" {
  189. if mf[1] == "name" {
  190. return toReturn + n.Name
  191. } else if mf[1] == "labels" {
  192. lkey := strings.Join(mf[2:len(mf)], "")
  193. return toReturn + n.Labels[lkey]
  194. } else if mf[1] == "annotations" {
  195. akey := strings.Join(mf[2:len(mf)], "")
  196. return toReturn + n.Annotations[akey]
  197. } else {
  198. klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For Node", m)
  199. return ""
  200. }
  201. } else {
  202. klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For Node", m)
  203. return ""
  204. }
  205. }
  206. func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
  207. mf := strings.Split(m, ".")
  208. if len(mf) > 1 && mf[0] == "metadata" {
  209. if mf[1] == "name" {
  210. return n.Name
  211. } else if mf[1] == "labels" {
  212. lkey := strings.Join(mf[2:len(mf)], "")
  213. return n.Labels[lkey]
  214. } else if mf[1] == "annotations" {
  215. akey := strings.Join(mf[2:len(mf)], "")
  216. return n.Annotations[akey]
  217. } else {
  218. klog.V(4).Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  219. return ""
  220. }
  221. } else {
  222. klog.V(4).Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  223. return ""
  224. }
  225. }
  226. func (c *CSVProvider) GetKey(l map[string]string, n *v1.Node) Key {
  227. id := NodeValueFromMapField(c.NodeMapField, n, c.UsesRegion)
  228. return &csvKey{
  229. ProviderID: id,
  230. Labels: l,
  231. }
  232. }
  233. type csvPVKey struct {
  234. Labels map[string]string
  235. ProviderID string
  236. StorageClassName string
  237. StorageClassParameters map[string]string
  238. Name string
  239. DefaultRegion string
  240. }
  241. func (key *csvPVKey) GetStorageClass() string {
  242. return key.StorageClassName
  243. }
  244. func (key *csvPVKey) Features() string {
  245. return key.ProviderID
  246. }
  247. func (c *CSVProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  248. id := PVValueFromMapField(c.PVMapField, pv)
  249. return &csvPVKey{
  250. Labels: pv.Labels,
  251. ProviderID: id,
  252. StorageClassName: pv.Spec.StorageClassName,
  253. StorageClassParameters: parameters,
  254. Name: pv.Name,
  255. DefaultRegion: defaultRegion,
  256. }
  257. }
  258. func (c *CSVProvider) PVPricing(pvk PVKey) (*PV, error) {
  259. c.DownloadPricingDataLock.RLock()
  260. defer c.DownloadPricingDataLock.RUnlock()
  261. pricing, ok := c.PricingPV[pvk.Features()]
  262. if !ok {
  263. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  264. return &PV{}, nil
  265. }
  266. return &PV{
  267. Cost: pricing.MarketPriceHourly,
  268. }, nil
  269. }