csvprovider.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package cloud
  2. import (
  3. "encoding/csv"
  4. "io"
  5. "os"
  6. "strings"
  7. "sync"
  8. v1 "k8s.io/api/core/v1"
  9. "k8s.io/klog"
  10. "github.com/jszwec/csvutil"
  11. )
  12. type CSVProvider struct {
  13. *CustomProvider
  14. CSVLocation string
  15. Pricing map[string]*price
  16. NodeMapField string
  17. PricingPV map[string]*price
  18. PVMapField string
  19. DownloadPricingDataLock sync.RWMutex
  20. }
  21. type price struct {
  22. EndTimestamp string `csv:"EndTimestamp"`
  23. InstanceID string `csv:"InstanceID"`
  24. AssetClass string `csv:"AssetClass"`
  25. InstanceIDField string `csv:"InstanceIDField"`
  26. InstanceType string `csv:"InstanceType"`
  27. MarketPriceHourly string `csv:"MarketPriceHourly"`
  28. Version string `csv:"Version"`
  29. }
  30. func parseMapField(mf string) {
  31. }
  32. func GetCsv(location string) (io.Reader, error) {
  33. return os.Open(location)
  34. }
  35. func (c *CSVProvider) DownloadPricingData() error {
  36. c.DownloadPricingDataLock.Lock()
  37. defer c.DownloadPricingDataLock.Unlock()
  38. pricing := make(map[string]*price)
  39. pvpricing := make(map[string]*price)
  40. header, err := csvutil.Header(price{}, "csv")
  41. if err != nil {
  42. return err
  43. }
  44. fieldsPerRecord := len(header)
  45. csvr, err := GetCsv(c.CSVLocation)
  46. csvReader := csv.NewReader(csvr)
  47. csvReader.Comma = '\t'
  48. csvReader.FieldsPerRecord = fieldsPerRecord
  49. dec, err := csvutil.NewDecoder(csvReader, header...)
  50. if err != nil {
  51. return err
  52. }
  53. for {
  54. p := price{}
  55. err := dec.Decode(&p)
  56. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  57. if err == io.EOF {
  58. break
  59. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  60. rec := dec.Record()
  61. if len(rec) != 1 {
  62. klog.V(2).Infof("Expected %d price info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  63. continue
  64. }
  65. if strings.Index(rec[0], "#") == 0 {
  66. continue
  67. } else {
  68. klog.V(3).Infof("skipping non-CSV line: %s", rec)
  69. continue
  70. }
  71. } else if err != nil {
  72. klog.V(2).Infof("Error during spot info decode: %+v", err)
  73. continue
  74. }
  75. klog.V(4).Infof("Found price info %+v", p)
  76. if p.AssetClass == "pv" {
  77. pvpricing[p.InstanceID] = &p
  78. c.PVMapField = p.InstanceIDField
  79. } else if p.AssetClass == "node" {
  80. pricing[p.InstanceID] = &p
  81. c.NodeMapField = p.InstanceIDField
  82. } else {
  83. klog.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
  84. pricing[p.InstanceID] = &p
  85. c.NodeMapField = p.InstanceIDField
  86. }
  87. }
  88. c.Pricing = pricing
  89. c.PricingPV = pvpricing
  90. return nil
  91. }
  92. type csvKey struct {
  93. Labels map[string]string
  94. ProviderID string
  95. }
  96. func (k *csvKey) Features() string {
  97. return ""
  98. }
  99. func (k *csvKey) GPUType() string {
  100. return ""
  101. }
  102. func (k *csvKey) ID() string {
  103. return k.ProviderID
  104. }
  105. func (c *CSVProvider) NodePricing(key Key) (*Node, error) {
  106. c.DownloadPricingDataLock.RLock()
  107. defer c.DownloadPricingDataLock.RUnlock()
  108. if p, ok := c.Pricing[key.ID()]; ok {
  109. return &Node{
  110. Cost: p.MarketPriceHourly,
  111. }, nil
  112. } else {
  113. klog.Infof("Unable to find Node matching %s", key.ID())
  114. return &Node{}, nil
  115. }
  116. }
  117. func NodeValueFromMapField(m string, n *v1.Node) string {
  118. mf := strings.Split(m, ".")
  119. if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
  120. return n.Spec.ProviderID
  121. } else if len(mf) > 1 && mf[0] == "metadata" {
  122. if mf[1] == "name" {
  123. return n.Name
  124. } else if mf[1] == "labels" {
  125. lkey := strings.Join(mf[2:len(mf)], "")
  126. return n.Labels[lkey]
  127. } else if mf[1] == "annotations" {
  128. akey := strings.Join(mf[2:len(mf)], "")
  129. return n.Annotations[akey]
  130. } else {
  131. klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For Node", m)
  132. return ""
  133. }
  134. } else {
  135. klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For Node", m)
  136. return ""
  137. }
  138. }
  139. func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
  140. mf := strings.Split(m, ".")
  141. if len(mf) > 1 && mf[0] == "metadata" {
  142. if mf[1] == "name" {
  143. return n.Name
  144. } else if mf[1] == "labels" {
  145. lkey := strings.Join(mf[2:len(mf)], "")
  146. return n.Labels[lkey]
  147. } else if mf[1] == "annotations" {
  148. akey := strings.Join(mf[2:len(mf)], "")
  149. return n.Annotations[akey]
  150. } else {
  151. klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  152. return ""
  153. }
  154. } else {
  155. klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  156. return ""
  157. }
  158. }
  159. func (c *CSVProvider) GetKey(l map[string]string, n *v1.Node) Key {
  160. id := NodeValueFromMapField(c.NodeMapField, n)
  161. return &csvKey{
  162. ProviderID: id,
  163. Labels: l,
  164. }
  165. }
  166. type csvPVKey struct {
  167. Labels map[string]string
  168. ProviderID string
  169. StorageClassName string
  170. StorageClassParameters map[string]string
  171. Name string
  172. DefaultRegion string
  173. }
  174. func (key *csvPVKey) GetStorageClass() string {
  175. return key.StorageClassName
  176. }
  177. func (key *csvPVKey) Features() string {
  178. return key.ProviderID
  179. }
  180. func (c *CSVProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  181. id := PVValueFromMapField(c.PVMapField, pv)
  182. return &csvPVKey{
  183. Labels: pv.Labels,
  184. ProviderID: id,
  185. StorageClassName: pv.Spec.StorageClassName,
  186. StorageClassParameters: parameters,
  187. Name: pv.Name,
  188. DefaultRegion: defaultRegion,
  189. }
  190. }
  191. func (c *CSVProvider) PVPricing(pvk PVKey) (*PV, error) {
  192. c.DownloadPricingDataLock.RLock()
  193. defer c.DownloadPricingDataLock.RUnlock()
  194. pricing, ok := c.PricingPV[pvk.Features()]
  195. if !ok {
  196. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  197. return &PV{}, nil
  198. }
  199. return &PV{
  200. Cost: pricing.MarketPriceHourly,
  201. }, nil
  202. }