csvprovider.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. package provider
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "os"
  7. "regexp"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/opencost/opencost/core/pkg/util"
  13. "github.com/opencost/opencost/pkg/cloud/models"
  14. "github.com/opencost/opencost/pkg/clustercache"
  15. "github.com/opencost/opencost/pkg/env"
  16. "github.com/aws/aws-sdk-go/aws"
  17. "github.com/aws/aws-sdk-go/aws/session"
  18. "github.com/aws/aws-sdk-go/service/s3"
  19. "github.com/opencost/opencost/core/pkg/log"
  20. v1 "k8s.io/api/core/v1"
  21. "github.com/jszwec/csvutil"
  22. )
  23. const refreshMinutes = 60
  24. var (
  25. provIdRx = regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  26. )
  27. type CSVProvider struct {
  28. *CustomProvider
  29. CSVLocation string
  30. Pricing map[string]*price
  31. NodeClassPricing map[string]float64
  32. NodeClassCount map[string]float64
  33. NodeMapField string
  34. PricingPV map[string]*price
  35. PVMapField string
  36. GPUClassPricing map[string]*price
  37. GPUMapFields []string // Fields in a node's labels that represent the GPU class.
  38. UsesRegion bool
  39. DownloadPricingDataLock sync.RWMutex
  40. }
  41. type price struct {
  42. EndTimestamp string `csv:"EndTimestamp"`
  43. InstanceID string `csv:"InstanceID"`
  44. Region string `csv:"Region"`
  45. AssetClass string `csv:"AssetClass"`
  46. InstanceIDField string `csv:"InstanceIDField"`
  47. InstanceType string `csv:"InstanceType"`
  48. MarketPriceHourly string `csv:"MarketPriceHourly"`
  49. Version string `csv:"Version"`
  50. }
  51. func GetCsv(location string) (io.Reader, error) {
  52. return os.Open(location)
  53. }
  54. func (c *CSVProvider) DownloadPricingData() error {
  55. c.DownloadPricingDataLock.Lock()
  56. defer time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
  57. defer c.DownloadPricingDataLock.Unlock()
  58. pricing := make(map[string]*price)
  59. nodeclasspricing := make(map[string]float64)
  60. nodeclasscount := make(map[string]float64)
  61. pvpricing := make(map[string]*price)
  62. gpupricing := make(map[string]*price)
  63. c.GPUMapFields = make([]string, 0, 1)
  64. header, err := csvutil.Header(price{}, "csv")
  65. if err != nil {
  66. return err
  67. }
  68. fieldsPerRecord := len(header)
  69. var csvr io.Reader
  70. var csverr error
  71. if strings.HasPrefix(c.CSVLocation, "s3://") {
  72. region := env.GetCSVRegion()
  73. conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
  74. endpoint := env.GetCSVEndpoint()
  75. if endpoint != "" {
  76. conf = conf.WithEndpoint(endpoint)
  77. }
  78. s3Client := s3.New(session.New(conf))
  79. bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
  80. if len(bucketAndKey) == 2 {
  81. out, err := s3Client.GetObject(&s3.GetObjectInput{
  82. Bucket: aws.String(bucketAndKey[0]),
  83. Key: aws.String(bucketAndKey[1]),
  84. })
  85. csverr = err
  86. csvr = out.Body
  87. } else {
  88. c.Pricing = pricing
  89. c.NodeClassPricing = nodeclasspricing
  90. c.NodeClassCount = nodeclasscount
  91. c.PricingPV = pvpricing
  92. c.GPUClassPricing = gpupricing
  93. return fmt.Errorf("Invalid s3 URI: %s", c.CSVLocation)
  94. }
  95. } else {
  96. csvr, csverr = GetCsv(c.CSVLocation)
  97. }
  98. if csverr != nil {
  99. log.Infof("Error reading csv at %s: %s", c.CSVLocation, csverr)
  100. c.Pricing = pricing
  101. c.NodeClassPricing = nodeclasspricing
  102. c.NodeClassCount = nodeclasscount
  103. c.PricingPV = pvpricing
  104. c.GPUClassPricing = gpupricing
  105. return nil
  106. }
  107. csvReader := csv.NewReader(csvr)
  108. csvReader.Comma = ','
  109. csvReader.FieldsPerRecord = fieldsPerRecord
  110. dec, err := csvutil.NewDecoder(csvReader, header...)
  111. if err != nil {
  112. c.Pricing = pricing
  113. c.NodeClassPricing = nodeclasspricing
  114. c.NodeClassCount = nodeclasscount
  115. c.PricingPV = pvpricing
  116. c.GPUClassPricing = gpupricing
  117. return err
  118. }
  119. for {
  120. p := price{}
  121. err := dec.Decode(&p)
  122. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  123. if err == io.EOF {
  124. break
  125. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  126. rec := dec.Record()
  127. if len(rec) != 1 {
  128. log.Infof("Expected %d price info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  129. continue
  130. }
  131. if strings.Index(rec[0], "#") == 0 {
  132. continue
  133. } else {
  134. log.Infof("skipping non-CSV line: %s", rec)
  135. continue
  136. }
  137. } else if err != nil {
  138. log.Infof("Error during spot info decode: %+v", err)
  139. continue
  140. }
  141. log.Infof("Found price info %+v", p)
  142. key := strings.ToLower(p.InstanceID)
  143. if p.Region != "" { // strip the casing from region and add to key.
  144. key = fmt.Sprintf("%s,%s", strings.ToLower(p.Region), strings.ToLower(p.InstanceID))
  145. c.UsesRegion = true
  146. }
  147. if p.AssetClass == "pv" {
  148. pvpricing[key] = &p
  149. c.PVMapField = p.InstanceIDField
  150. } else if p.AssetClass == "node" {
  151. pricing[key] = &p
  152. classKey := p.Region + "," + p.InstanceType + "," + p.AssetClass
  153. cost, err := strconv.ParseFloat(p.MarketPriceHourly, 64)
  154. if err != nil {
  155. } else {
  156. if _, ok := nodeclasspricing[classKey]; ok {
  157. oldPrice := nodeclasspricing[classKey]
  158. oldCount := nodeclasscount[classKey]
  159. newPrice := ((oldPrice * oldCount) + cost) / (oldCount + 1.0)
  160. nodeclasscount[classKey] = newPrice
  161. nodeclasscount[classKey]++
  162. } else {
  163. nodeclasspricing[classKey] = cost
  164. nodeclasscount[classKey] = 1
  165. }
  166. }
  167. c.NodeMapField = p.InstanceIDField
  168. } else if p.AssetClass == "gpu" {
  169. gpupricing[key] = &p
  170. c.GPUMapFields = append(c.GPUMapFields, strings.ToLower(p.InstanceIDField))
  171. } else {
  172. log.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
  173. pricing[key] = &p
  174. c.NodeMapField = p.InstanceIDField
  175. }
  176. }
  177. if len(pricing) > 0 {
  178. c.Pricing = pricing
  179. c.NodeClassPricing = nodeclasspricing
  180. c.NodeClassCount = nodeclasscount
  181. c.PricingPV = pvpricing
  182. c.GPUClassPricing = gpupricing
  183. } else {
  184. log.DedupedWarningf(5, "No data received from csv at %s", c.CSVLocation)
  185. }
  186. return nil
  187. }
  188. type csvKey struct {
  189. Labels map[string]string
  190. ProviderID string
  191. GPULabel []string
  192. GPU int64
  193. }
  194. func (k *csvKey) Features() string {
  195. instanceType, _ := util.GetInstanceType(k.Labels)
  196. region, _ := util.GetRegion(k.Labels)
  197. class := "node"
  198. return region + "," + instanceType + "," + class
  199. }
  200. func (k *csvKey) GPUCount() int {
  201. return int(k.GPU)
  202. }
  203. func (k *csvKey) GPUType() string {
  204. for _, label := range k.GPULabel {
  205. if val, ok := k.Labels[label]; ok {
  206. return val
  207. }
  208. }
  209. return ""
  210. }
  211. func (k *csvKey) ID() string {
  212. return k.ProviderID
  213. }
  214. func (c *CSVProvider) nodePricing(key models.Key) *models.Node {
  215. if p, ok := c.Pricing[key.ID()]; ok {
  216. return &models.Node{
  217. Cost: p.MarketPriceHourly,
  218. PricingType: models.CsvExact,
  219. }
  220. }
  221. s := strings.Split(key.ID(), ",") // Try without a region to be sure
  222. if len(s) == 2 {
  223. if p, ok := c.Pricing[s[1]]; ok {
  224. return &models.Node{
  225. Cost: p.MarketPriceHourly,
  226. PricingType: models.CsvExact,
  227. }
  228. }
  229. }
  230. classKey := key.Features() // Use node attributes to try and do a class match
  231. if cost, ok := c.NodeClassPricing[classKey]; ok {
  232. log.Infof("Unable to find provider ID `%s`, using features:`%s`", key.ID(), key.Features())
  233. return &models.Node{
  234. Cost: fmt.Sprintf("%f", cost),
  235. PricingType: models.CsvClass,
  236. }
  237. }
  238. return nil
  239. }
  240. func (c *CSVProvider) NodePricing(key models.Key) (*models.Node, models.PricingMetadata, error) {
  241. c.DownloadPricingDataLock.RLock()
  242. defer c.DownloadPricingDataLock.RUnlock()
  243. node := c.nodePricing(key)
  244. if node == nil {
  245. return nil, models.PricingMetadata{}, fmt.Errorf("Unable to find Node matching `%s`:`%s`", key.ID(), key.Features())
  246. }
  247. if t := key.GPUType(); t != "" {
  248. t = strings.ToLower(t)
  249. count := key.GPUCount()
  250. node.GPU = strconv.Itoa(count)
  251. hourly := 0.0
  252. if p, ok := c.GPUClassPricing[t]; ok {
  253. var err error
  254. hourly, err = strconv.ParseFloat(p.MarketPriceHourly, 64)
  255. if err != nil {
  256. log.Errorf("Unable to parse %s as float", p.MarketPriceHourly)
  257. }
  258. }
  259. totalCost := hourly * float64(count)
  260. node.GPUCost = fmt.Sprintf("%f", hourly)
  261. nc, err := strconv.ParseFloat(node.Cost, 64)
  262. if err != nil {
  263. log.Errorf("Unable to parse %s as float", node.Cost)
  264. }
  265. node.Cost = fmt.Sprintf("%f", nc+totalCost)
  266. }
  267. return node, models.PricingMetadata{}, nil
  268. }
  269. func NodeValueFromMapField(m string, n *clustercache.Node, useRegion bool) string {
  270. mf := strings.Split(m, ".")
  271. toReturn := ""
  272. if useRegion {
  273. if region, ok := util.GetRegion(n.Labels); ok {
  274. toReturn = region + ","
  275. } else {
  276. log.Errorf("Getting region based on labels failed")
  277. }
  278. }
  279. if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
  280. for matchNum, group := range provIdRx.FindStringSubmatch(n.SpecProviderID) {
  281. if matchNum == 2 {
  282. return toReturn + group
  283. }
  284. }
  285. if strings.HasPrefix(n.SpecProviderID, "azure://") {
  286. vmOrScaleSet := strings.ToLower(strings.TrimPrefix(n.SpecProviderID, "azure://"))
  287. return toReturn + vmOrScaleSet
  288. }
  289. return toReturn + n.SpecProviderID
  290. } else if len(mf) > 1 && mf[0] == "metadata" {
  291. if mf[1] == "name" {
  292. return toReturn + n.Name
  293. } else if mf[1] == "labels" {
  294. lkey := strings.Join(mf[2:len(mf)], ".")
  295. return toReturn + n.Labels[lkey]
  296. } else if mf[1] == "annotations" {
  297. akey := strings.Join(mf[2:len(mf)], ".")
  298. return toReturn + n.Annotations[akey]
  299. } else {
  300. log.DedupedInfof(10, "Unsupported InstanceIDField %s in CSV For Node", m)
  301. return ""
  302. }
  303. } else {
  304. log.DedupedInfof(10, "Unsupported InstanceIDField %s in CSV For Node", m)
  305. return ""
  306. }
  307. }
  308. func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
  309. mf := strings.Split(m, ".")
  310. if len(mf) > 1 && mf[0] == "metadata" {
  311. if mf[1] == "name" {
  312. return n.Name
  313. } else if mf[1] == "labels" {
  314. lkey := strings.Join(mf[2:len(mf)], "")
  315. return n.Labels[lkey]
  316. } else if mf[1] == "annotations" {
  317. akey := strings.Join(mf[2:len(mf)], "")
  318. return n.Annotations[akey]
  319. } else {
  320. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  321. return ""
  322. }
  323. } else if len(mf) > 2 && mf[0] == "spec" {
  324. if mf[1] == "capacity" && mf[2] == "storage" {
  325. skey := n.Spec.Capacity["storage"]
  326. return skey.String()
  327. } else {
  328. log.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  329. return ""
  330. }
  331. } else if len(mf) > 1 && mf[0] == "spec" {
  332. if mf[1] == "storageClassName" {
  333. return n.Spec.StorageClassName
  334. } else {
  335. log.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  336. return ""
  337. }
  338. } else {
  339. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  340. return ""
  341. }
  342. }
  343. func (c *CSVProvider) GetKey(l map[string]string, n *clustercache.Node) models.Key {
  344. id := NodeValueFromMapField(c.NodeMapField, n, c.UsesRegion)
  345. var gpuCount int64
  346. gpuCount = 0
  347. if gpuc, ok := n.Status.Capacity["nvidia.com/gpu"]; ok { // TODO: support non-nvidia GPUs
  348. gpuCount = gpuc.Value()
  349. }
  350. return &csvKey{
  351. ProviderID: strings.ToLower(id),
  352. Labels: l,
  353. GPULabel: c.GPUMapFields,
  354. GPU: gpuCount,
  355. }
  356. }
  357. type csvPVKey struct {
  358. Labels map[string]string
  359. ProviderID string
  360. StorageClassName string
  361. StorageClassParameters map[string]string
  362. Name string
  363. DefaultRegion string
  364. }
  365. func (key *csvPVKey) ID() string {
  366. return ""
  367. }
  368. func (key *csvPVKey) GetStorageClass() string {
  369. return key.StorageClassName
  370. }
  371. func (key *csvPVKey) Features() string {
  372. return key.ProviderID
  373. }
  374. func (c *CSVProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) models.PVKey {
  375. id := PVValueFromMapField(c.PVMapField, pv)
  376. return &csvPVKey{
  377. Labels: pv.Labels,
  378. ProviderID: id,
  379. StorageClassName: pv.Spec.StorageClassName,
  380. StorageClassParameters: parameters,
  381. Name: pv.Name,
  382. DefaultRegion: defaultRegion,
  383. }
  384. }
  385. func (c *CSVProvider) PVPricing(pvk models.PVKey) (*models.PV, error) {
  386. c.DownloadPricingDataLock.RLock()
  387. defer c.DownloadPricingDataLock.RUnlock()
  388. pricing, ok := c.PricingPV[pvk.Features()]
  389. if !ok {
  390. log.Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  391. return &models.PV{}, nil
  392. }
  393. return &models.PV{
  394. Cost: pricing.MarketPriceHourly,
  395. }, nil
  396. }
  397. func (c *CSVProvider) ServiceAccountStatus() *models.ServiceAccountStatus {
  398. return &models.ServiceAccountStatus{
  399. Checks: []*models.ServiceAccountCheck{},
  400. }
  401. }
  402. func (*CSVProvider) ClusterManagementPricing() (string, float64, error) {
  403. return "", 0.0, nil
  404. }
  405. func (c *CSVProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  406. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  407. }
  408. func (c *CSVProvider) Regions() []string {
  409. return []string{}
  410. }
  411. func (c *CSVProvider) PricingSourceSummary() interface{} {
  412. return c.Pricing
  413. }