csvprovider.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. package provider
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "os"
  7. "regexp"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/opencost/opencost/core/pkg/clustercache"
  13. "github.com/opencost/opencost/core/pkg/util"
  14. "github.com/opencost/opencost/pkg/cloud/models"
  15. "github.com/opencost/opencost/pkg/env"
  16. "github.com/aws/aws-sdk-go/aws"
  17. "github.com/aws/aws-sdk-go/aws/session"
  18. "github.com/aws/aws-sdk-go/service/s3"
  19. "github.com/jszwec/csvutil"
  20. "github.com/opencost/opencost/core/pkg/log"
  21. )
  22. const refreshMinutes = 60
  23. var (
  24. provIdRx = regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  25. )
  26. type CSVProvider struct {
  27. *CustomProvider
  28. CSVLocation string
  29. Pricing map[string]*price
  30. NodeClassPricing map[string]float64
  31. NodeClassCount map[string]float64
  32. NodeMapField string
  33. PricingPV map[string]*price
  34. PVMapField string
  35. GPUClassPricing map[string]*price
  36. GPULabelPricing map[string]*price
  37. GPUMapFields []string // Fields in a node's labels that represent the GPU class.
  38. UsesRegion bool
  39. DownloadPricingDataLock sync.RWMutex
  40. }
  41. type price struct {
  42. EndTimestamp string `csv:"EndTimestamp"`
  43. InstanceID string `csv:"InstanceID"`
  44. Region string `csv:"Region"`
  45. AssetClass string `csv:"AssetClass"`
  46. InstanceIDField string `csv:"InstanceIDField"`
  47. InstanceType string `csv:"InstanceType"`
  48. MarketPriceHourly string `csv:"MarketPriceHourly"`
  49. Version string `csv:"Version"`
  50. }
  51. func GetCsv(location string) (io.Reader, error) {
  52. return os.Open(location)
  53. }
  54. func (c *CSVProvider) DownloadPricingData() error {
  55. c.DownloadPricingDataLock.Lock()
  56. defer time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
  57. defer c.DownloadPricingDataLock.Unlock()
  58. pricing := make(map[string]*price)
  59. nodeclasspricing := make(map[string]float64)
  60. nodeclasscount := make(map[string]float64)
  61. pvpricing := make(map[string]*price)
  62. gpupricing := make(map[string]*price)
  63. gpulabelpricing := make(map[string]*price)
  64. c.GPUMapFields = make([]string, 0, 1)
  65. header, err := csvutil.Header(price{}, "csv")
  66. if err != nil {
  67. return err
  68. }
  69. fieldsPerRecord := len(header)
  70. var csvr io.Reader
  71. var csverr error
  72. if strings.HasPrefix(c.CSVLocation, "s3://") {
  73. region := env.GetCSVRegion()
  74. conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
  75. endpoint := env.GetCSVEndpoint()
  76. if endpoint != "" {
  77. conf = conf.WithEndpoint(endpoint)
  78. }
  79. s3Client := s3.New(session.New(conf))
  80. bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
  81. if len(bucketAndKey) == 2 {
  82. out, err := s3Client.GetObject(&s3.GetObjectInput{
  83. Bucket: aws.String(bucketAndKey[0]),
  84. Key: aws.String(bucketAndKey[1]),
  85. })
  86. csverr = err
  87. csvr = out.Body
  88. } else {
  89. c.Pricing = pricing
  90. c.NodeClassPricing = nodeclasspricing
  91. c.NodeClassCount = nodeclasscount
  92. c.PricingPV = pvpricing
  93. c.GPUClassPricing = gpupricing
  94. c.GPULabelPricing = gpulabelpricing
  95. return fmt.Errorf("Invalid s3 URI: %s", c.CSVLocation)
  96. }
  97. } else {
  98. csvr, csverr = GetCsv(c.CSVLocation)
  99. }
  100. if csverr != nil {
  101. log.Infof("Error reading csv at %s: %s", c.CSVLocation, csverr)
  102. c.Pricing = pricing
  103. c.NodeClassPricing = nodeclasspricing
  104. c.NodeClassCount = nodeclasscount
  105. c.PricingPV = pvpricing
  106. c.GPUClassPricing = gpupricing
  107. c.GPULabelPricing = gpulabelpricing
  108. return nil
  109. }
  110. csvReader := csv.NewReader(csvr)
  111. csvReader.Comma = ','
  112. csvReader.FieldsPerRecord = fieldsPerRecord
  113. dec, err := csvutil.NewDecoder(csvReader, header...)
  114. if err != nil {
  115. c.Pricing = pricing
  116. c.NodeClassPricing = nodeclasspricing
  117. c.NodeClassCount = nodeclasscount
  118. c.PricingPV = pvpricing
  119. c.GPUClassPricing = gpupricing
  120. c.GPULabelPricing = gpulabelpricing
  121. return err
  122. }
  123. for {
  124. p := price{}
  125. err := dec.Decode(&p)
  126. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  127. if err == io.EOF {
  128. break
  129. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  130. rec := dec.Record()
  131. if len(rec) != 1 {
  132. log.Infof("Expected %d price info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  133. continue
  134. }
  135. if strings.Index(rec[0], "#") == 0 {
  136. continue
  137. } else {
  138. log.Infof("skipping non-CSV line: %s", rec)
  139. continue
  140. }
  141. } else if err != nil {
  142. log.Infof("Error during spot info decode: %+v", err)
  143. continue
  144. }
  145. log.Infof("Found price info %+v", p)
  146. key := strings.ToLower(p.InstanceID)
  147. if p.Region != "" { // strip the casing from region and add to key.
  148. key = fmt.Sprintf("%s,%s", strings.ToLower(p.Region), strings.ToLower(p.InstanceID))
  149. c.UsesRegion = true
  150. }
  151. if p.AssetClass == "pv" {
  152. pvpricing[key] = &p
  153. c.PVMapField = p.InstanceIDField
  154. } else if p.AssetClass == "node" {
  155. pricing[key] = &p
  156. classKey := p.Region + "," + p.InstanceType + "," + p.AssetClass
  157. cost, err := strconv.ParseFloat(p.MarketPriceHourly, 64)
  158. if err != nil {
  159. } else {
  160. if _, ok := nodeclasspricing[classKey]; ok {
  161. oldPrice := nodeclasspricing[classKey]
  162. oldCount := nodeclasscount[classKey]
  163. newPrice := ((oldPrice * oldCount) + cost) / (oldCount + 1.0)
  164. nodeclasscount[classKey] = newPrice
  165. nodeclasscount[classKey]++
  166. } else {
  167. nodeclasspricing[classKey] = cost
  168. nodeclasscount[classKey] = 1
  169. }
  170. }
  171. c.NodeMapField = p.InstanceIDField
  172. } else if p.AssetClass == "gpu" {
  173. gpupricing[key] = &p
  174. c.GPUMapFields = append(c.GPUMapFields, strings.ToLower(p.InstanceIDField))
  175. } else if p.AssetClass == "gpulabel" {
  176. labelKeyValue := p.InstanceIDField + "=" + p.InstanceID
  177. gpulabelpricing[labelKeyValue] = &p
  178. } else {
  179. log.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
  180. pricing[key] = &p
  181. c.NodeMapField = p.InstanceIDField
  182. }
  183. }
  184. if len(pricing) > 0 || len(gpupricing) > 0 || len(gpulabelpricing) > 0 {
  185. c.Pricing = pricing
  186. c.NodeClassPricing = nodeclasspricing
  187. c.NodeClassCount = nodeclasscount
  188. c.PricingPV = pvpricing
  189. c.GPUClassPricing = gpupricing
  190. c.GPULabelPricing = gpulabelpricing
  191. } else {
  192. log.DedupedWarningf(5, "No data received from csv at %s", c.CSVLocation)
  193. }
  194. return nil
  195. }
  196. type csvKey struct {
  197. Labels map[string]string
  198. ProviderID string
  199. GPULabel []string
  200. GPU int64
  201. }
  202. func (k *csvKey) Features() string {
  203. instanceType, _ := util.GetInstanceType(k.Labels)
  204. region, _ := util.GetRegion(k.Labels)
  205. class := "node"
  206. return region + "," + instanceType + "," + class
  207. }
  208. func (k *csvKey) GPUCount() int {
  209. return int(k.GPU)
  210. }
  211. func (k *csvKey) GPUType() string {
  212. for _, label := range k.GPULabel {
  213. if val, ok := k.Labels[label]; ok {
  214. return val
  215. }
  216. }
  217. return ""
  218. }
  219. func (k *csvKey) ID() string {
  220. return k.ProviderID
  221. }
  222. func (c *CSVProvider) nodePricing(key models.Key) *models.Node {
  223. if p, ok := c.Pricing[key.ID()]; ok {
  224. return &models.Node{
  225. Cost: p.MarketPriceHourly,
  226. PricingType: models.CsvExact,
  227. }
  228. }
  229. s := strings.Split(key.ID(), ",") // Try without a region to be sure
  230. if len(s) == 2 {
  231. if p, ok := c.Pricing[s[1]]; ok {
  232. return &models.Node{
  233. Cost: p.MarketPriceHourly,
  234. PricingType: models.CsvExact,
  235. }
  236. }
  237. }
  238. classKey := key.Features() // Use node attributes to try and do a class match
  239. if cost, ok := c.NodeClassPricing[classKey]; ok {
  240. log.Infof("Unable to find provider ID `%s`, using features:`%s`", key.ID(), key.Features())
  241. return &models.Node{
  242. Cost: fmt.Sprintf("%f", cost),
  243. PricingType: models.CsvClass,
  244. }
  245. }
  246. return nil
  247. }
  248. func (c *CSVProvider) NodePricing(key models.Key) (*models.Node, models.PricingMetadata, error) {
  249. c.DownloadPricingDataLock.RLock()
  250. defer c.DownloadPricingDataLock.RUnlock()
  251. node := c.nodePricing(key)
  252. if node == nil {
  253. return nil, models.PricingMetadata{}, fmt.Errorf("Unable to find Node matching `%s`:`%s`", key.ID(), key.Features())
  254. }
  255. if t := key.GPUType(); t != "" {
  256. t = strings.ToLower(t)
  257. count := key.GPUCount()
  258. node.GPU = strconv.Itoa(count)
  259. hourly := 0.0
  260. if p, ok := c.GPUClassPricing[t]; ok {
  261. var err error
  262. hourly, err = strconv.ParseFloat(p.MarketPriceHourly, 64)
  263. if err != nil {
  264. log.Errorf("Unable to parse %s as float", p.MarketPriceHourly)
  265. }
  266. }
  267. totalCost := hourly * float64(count)
  268. node.GPUCost = fmt.Sprintf("%f", hourly)
  269. nc, err := strconv.ParseFloat(node.Cost, 64)
  270. if err != nil {
  271. log.Errorf("Unable to parse %s as float", node.Cost)
  272. }
  273. node.Cost = fmt.Sprintf("%f", nc+totalCost)
  274. }
  275. return node, models.PricingMetadata{}, nil
  276. }
  277. func (c *CSVProvider) GpuPricing(nodeLabels map[string]string) (string, error) {
  278. for key, value := range nodeLabels {
  279. labelKeyValue := key + "=" + value
  280. if p, ok := c.GPULabelPricing[labelKeyValue]; ok {
  281. return p.MarketPriceHourly, nil
  282. }
  283. }
  284. return "", nil
  285. }
  286. func NodeValueFromMapField(m string, n *clustercache.Node, useRegion bool) string {
  287. mf := strings.Split(m, ".")
  288. toReturn := ""
  289. if useRegion {
  290. if region, ok := util.GetRegion(n.Labels); ok {
  291. toReturn = region + ","
  292. } else {
  293. log.Errorf("Getting region based on labels failed")
  294. }
  295. }
  296. if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
  297. for matchNum, group := range provIdRx.FindStringSubmatch(n.SpecProviderID) {
  298. if matchNum == 2 {
  299. return toReturn + group
  300. }
  301. }
  302. if strings.HasPrefix(n.SpecProviderID, "azure://") {
  303. vmOrScaleSet := strings.ToLower(strings.TrimPrefix(n.SpecProviderID, "azure://"))
  304. return toReturn + vmOrScaleSet
  305. }
  306. return toReturn + n.SpecProviderID
  307. } else if len(mf) > 1 && mf[0] == "metadata" {
  308. if mf[1] == "name" {
  309. return toReturn + n.Name
  310. } else if mf[1] == "labels" {
  311. lkey := strings.Join(mf[2:], ".")
  312. return toReturn + n.Labels[lkey]
  313. } else if mf[1] == "annotations" {
  314. akey := strings.Join(mf[2:], ".")
  315. return toReturn + n.Annotations[akey]
  316. } else {
  317. log.DedupedInfof(10, "Unsupported InstanceIDField %s in CSV For Node", m)
  318. return ""
  319. }
  320. } else {
  321. log.DedupedInfof(10, "Unsupported InstanceIDField %s in CSV For Node", m)
  322. return ""
  323. }
  324. }
  325. func PVValueFromMapField(m string, n *clustercache.PersistentVolume) string {
  326. mf := strings.Split(m, ".")
  327. if len(mf) > 1 && mf[0] == "metadata" {
  328. if mf[1] == "name" {
  329. return n.Name
  330. } else if mf[1] == "labels" {
  331. lkey := strings.Join(mf[2:], "")
  332. return n.Labels[lkey]
  333. } else if mf[1] == "annotations" {
  334. akey := strings.Join(mf[2:], "")
  335. return n.Annotations[akey]
  336. } else {
  337. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  338. return ""
  339. }
  340. } else if len(mf) > 2 && mf[0] == "spec" {
  341. if mf[1] == "capacity" && mf[2] == "storage" {
  342. skey := n.Spec.Capacity["storage"]
  343. return skey.String()
  344. } else {
  345. log.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  346. return ""
  347. }
  348. } else if len(mf) > 1 && mf[0] == "spec" {
  349. if mf[1] == "storageClassName" {
  350. return n.Spec.StorageClassName
  351. } else {
  352. log.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  353. return ""
  354. }
  355. } else {
  356. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  357. return ""
  358. }
  359. }
  360. func (c *CSVProvider) GetKey(l map[string]string, n *clustercache.Node) models.Key {
  361. id := NodeValueFromMapField(c.NodeMapField, n, c.UsesRegion)
  362. var gpuCount int64
  363. gpuCount = 0
  364. if gpuc, ok := n.Status.Capacity["nvidia.com/gpu"]; ok { // TODO: support non-nvidia GPUs
  365. gpuCount = gpuc.Value()
  366. }
  367. return &csvKey{
  368. ProviderID: strings.ToLower(id),
  369. Labels: l,
  370. GPULabel: c.GPUMapFields,
  371. GPU: gpuCount,
  372. }
  373. }
  374. type csvPVKey struct {
  375. Labels map[string]string
  376. ProviderID string
  377. StorageClassName string
  378. StorageClassParameters map[string]string
  379. Name string
  380. DefaultRegion string
  381. }
  382. func (key *csvPVKey) ID() string {
  383. return ""
  384. }
  385. func (key *csvPVKey) GetStorageClass() string {
  386. return key.StorageClassName
  387. }
  388. func (key *csvPVKey) Features() string {
  389. return key.ProviderID
  390. }
  391. func (c *CSVProvider) GetPVKey(pv *clustercache.PersistentVolume, parameters map[string]string, defaultRegion string) models.PVKey {
  392. id := PVValueFromMapField(c.PVMapField, pv)
  393. return &csvPVKey{
  394. Labels: pv.Labels,
  395. ProviderID: strings.ToLower(id),
  396. StorageClassName: pv.Spec.StorageClassName,
  397. StorageClassParameters: parameters,
  398. Name: pv.Name,
  399. DefaultRegion: defaultRegion,
  400. }
  401. }
  402. func (c *CSVProvider) PVPricing(pvk models.PVKey) (*models.PV, error) {
  403. c.DownloadPricingDataLock.RLock()
  404. defer c.DownloadPricingDataLock.RUnlock()
  405. pricing, ok := c.PricingPV[pvk.Features()]
  406. if !ok {
  407. log.Debugf("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  408. return &models.PV{}, nil
  409. }
  410. return &models.PV{
  411. Cost: pricing.MarketPriceHourly,
  412. }, nil
  413. }
  414. func (c *CSVProvider) ServiceAccountStatus() *models.ServiceAccountStatus {
  415. return &models.ServiceAccountStatus{
  416. Checks: []*models.ServiceAccountCheck{},
  417. }
  418. }
  419. func (*CSVProvider) ClusterManagementPricing() (string, float64, error) {
  420. return "", 0.0, nil
  421. }
  422. func (c *CSVProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  423. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  424. }
  425. func (c *CSVProvider) Regions() []string {
  426. return []string{}
  427. }
  428. func (c *CSVProvider) PricingSourceSummary() interface{} {
  429. return c.Pricing
  430. }