csvprovider.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package cloud
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "os"
  7. "strings"
  8. "sync"
  9. v1 "k8s.io/api/core/v1"
  10. "k8s.io/klog"
  11. "github.com/jszwec/csvutil"
  12. )
  13. type CSVProvider struct {
  14. *CustomProvider
  15. CSVLocation string
  16. Pricing map[string]*price
  17. NodeMapField string
  18. PricingPV map[string]*price
  19. PVMapField string
  20. DownloadPricingDataLock sync.RWMutex
  21. }
  22. type price struct {
  23. EndTimestamp string `csv:"EndTimestamp"`
  24. InstanceID string `csv:"InstanceID"`
  25. AssetClass string `csv:"AssetClass"`
  26. InstanceIDField string `csv:"InstanceIDField"`
  27. InstanceType string `csv:"InstanceType"`
  28. MarketPriceHourly string `csv:"MarketPriceHourly"`
  29. Version string `csv:"Version"`
  30. }
  31. func parseMapField(mf string) {
  32. }
  33. func GetCsv(location string) (io.Reader, error) {
  34. return os.Open(location)
  35. }
  36. func (c *CSVProvider) DownloadPricingData() error {
  37. c.DownloadPricingDataLock.Lock()
  38. defer c.DownloadPricingDataLock.Unlock()
  39. pricing := make(map[string]*price)
  40. pvpricing := make(map[string]*price)
  41. header, err := csvutil.Header(price{}, "csv")
  42. if err != nil {
  43. return err
  44. }
  45. fieldsPerRecord := len(header)
  46. csvr, err := GetCsv(c.CSVLocation)
  47. csvReader := csv.NewReader(csvr)
  48. csvReader.Comma = ','
  49. csvReader.FieldsPerRecord = fieldsPerRecord
  50. dec, err := csvutil.NewDecoder(csvReader, header...)
  51. if err != nil {
  52. return err
  53. }
  54. for {
  55. p := price{}
  56. err := dec.Decode(&p)
  57. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  58. if err == io.EOF {
  59. break
  60. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  61. rec := dec.Record()
  62. if len(rec) != 1 {
  63. klog.V(2).Infof("Expected %d price info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  64. continue
  65. }
  66. if strings.Index(rec[0], "#") == 0 {
  67. continue
  68. } else {
  69. klog.V(3).Infof("skipping non-CSV line: %s", rec)
  70. continue
  71. }
  72. } else if err != nil {
  73. klog.V(2).Infof("Error during spot info decode: %+v", err)
  74. continue
  75. }
  76. klog.V(4).Infof("Found price info %+v", p)
  77. if p.AssetClass == "pv" {
  78. pvpricing[p.InstanceID] = &p
  79. c.PVMapField = p.InstanceIDField
  80. } else if p.AssetClass == "node" {
  81. pricing[p.InstanceID] = &p
  82. c.NodeMapField = p.InstanceIDField
  83. } else {
  84. klog.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
  85. pricing[p.InstanceID] = &p
  86. c.NodeMapField = p.InstanceIDField
  87. }
  88. }
  89. if len(pricing) > 0 {
  90. c.Pricing = pricing
  91. c.PricingPV = pvpricing
  92. } else {
  93. klog.Infof("[WARNING] No data received from csv")
  94. }
  95. return nil
  96. }
  97. type csvKey struct {
  98. Labels map[string]string
  99. ProviderID string
  100. }
  101. func (k *csvKey) Features() string {
  102. return ""
  103. }
  104. func (k *csvKey) GPUType() string {
  105. return ""
  106. }
  107. func (k *csvKey) ID() string {
  108. return k.ProviderID
  109. }
  110. func (c *CSVProvider) NodePricing(key Key) (*Node, error) {
  111. c.DownloadPricingDataLock.RLock()
  112. defer c.DownloadPricingDataLock.RUnlock()
  113. if p, ok := c.Pricing[key.ID()]; ok {
  114. return &Node{
  115. Cost: p.MarketPriceHourly,
  116. }, nil
  117. }
  118. return nil, fmt.Errorf("Unable to find Node matching %s", key.ID())
  119. }
  120. func NodeValueFromMapField(m string, n *v1.Node) string {
  121. mf := strings.Split(m, ".")
  122. if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
  123. return n.Spec.ProviderID
  124. } else if len(mf) > 1 && mf[0] == "metadata" {
  125. if mf[1] == "name" {
  126. return n.Name
  127. } else if mf[1] == "labels" {
  128. lkey := strings.Join(mf[2:len(mf)], "")
  129. return n.Labels[lkey]
  130. } else if mf[1] == "annotations" {
  131. akey := strings.Join(mf[2:len(mf)], "")
  132. return n.Annotations[akey]
  133. } else {
  134. klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For Node", m)
  135. return ""
  136. }
  137. } else {
  138. klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For Node", m)
  139. return ""
  140. }
  141. }
  142. func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
  143. mf := strings.Split(m, ".")
  144. if len(mf) > 1 && mf[0] == "metadata" {
  145. if mf[1] == "name" {
  146. return n.Name
  147. } else if mf[1] == "labels" {
  148. lkey := strings.Join(mf[2:len(mf)], "")
  149. return n.Labels[lkey]
  150. } else if mf[1] == "annotations" {
  151. akey := strings.Join(mf[2:len(mf)], "")
  152. return n.Annotations[akey]
  153. } else {
  154. klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  155. return ""
  156. }
  157. } else {
  158. klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  159. return ""
  160. }
  161. }
  162. func (c *CSVProvider) GetKey(l map[string]string, n *v1.Node) Key {
  163. id := NodeValueFromMapField(c.NodeMapField, n)
  164. return &csvKey{
  165. ProviderID: id,
  166. Labels: l,
  167. }
  168. }
  169. type csvPVKey struct {
  170. Labels map[string]string
  171. ProviderID string
  172. StorageClassName string
  173. StorageClassParameters map[string]string
  174. Name string
  175. DefaultRegion string
  176. }
  177. func (key *csvPVKey) GetStorageClass() string {
  178. return key.StorageClassName
  179. }
  180. func (key *csvPVKey) Features() string {
  181. return key.ProviderID
  182. }
  183. func (c *CSVProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  184. id := PVValueFromMapField(c.PVMapField, pv)
  185. return &csvPVKey{
  186. Labels: pv.Labels,
  187. ProviderID: id,
  188. StorageClassName: pv.Spec.StorageClassName,
  189. StorageClassParameters: parameters,
  190. Name: pv.Name,
  191. DefaultRegion: defaultRegion,
  192. }
  193. }
  194. func (c *CSVProvider) PVPricing(pvk PVKey) (*PV, error) {
  195. c.DownloadPricingDataLock.RLock()
  196. defer c.DownloadPricingDataLock.RUnlock()
  197. pricing, ok := c.PricingPV[pvk.Features()]
  198. if !ok {
  199. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  200. return &PV{}, nil
  201. }
  202. return &PV{
  203. Cost: pricing.MarketPriceHourly,
  204. }, nil
  205. }