csvprovider.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. package cloud
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/opencost/opencost/pkg/cloud/models"
  12. "github.com/opencost/opencost/pkg/env"
  13. "github.com/opencost/opencost/pkg/util"
  14. "github.com/aws/aws-sdk-go/aws"
  15. "github.com/aws/aws-sdk-go/aws/session"
  16. "github.com/aws/aws-sdk-go/service/s3"
  17. "github.com/opencost/opencost/pkg/log"
  18. v1 "k8s.io/api/core/v1"
  19. "github.com/jszwec/csvutil"
  20. )
  21. const refreshMinutes = 60
  22. type CSVProvider struct {
  23. *CustomProvider
  24. CSVLocation string
  25. Pricing map[string]*price
  26. NodeClassPricing map[string]float64
  27. NodeClassCount map[string]float64
  28. NodeMapField string
  29. PricingPV map[string]*price
  30. PVMapField string
  31. GPUClassPricing map[string]*price
  32. GPUMapFields []string // Fields in a node's labels that represent the GPU class.
  33. UsesRegion bool
  34. DownloadPricingDataLock sync.RWMutex
  35. }
  36. type price struct {
  37. EndTimestamp string `csv:"EndTimestamp"`
  38. InstanceID string `csv:"InstanceID"`
  39. Region string `csv:"Region"`
  40. AssetClass string `csv:"AssetClass"`
  41. InstanceIDField string `csv:"InstanceIDField"`
  42. InstanceType string `csv:"InstanceType"`
  43. MarketPriceHourly string `csv:"MarketPriceHourly"`
  44. Version string `csv:"Version"`
  45. }
  46. func GetCsv(location string) (io.Reader, error) {
  47. return os.Open(location)
  48. }
  49. func (c *CSVProvider) DownloadPricingData() error {
  50. c.DownloadPricingDataLock.Lock()
  51. defer time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
  52. defer c.DownloadPricingDataLock.Unlock()
  53. pricing := make(map[string]*price)
  54. nodeclasspricing := make(map[string]float64)
  55. nodeclasscount := make(map[string]float64)
  56. pvpricing := make(map[string]*price)
  57. gpupricing := make(map[string]*price)
  58. c.GPUMapFields = make([]string, 0, 1)
  59. header, err := csvutil.Header(price{}, "csv")
  60. if err != nil {
  61. return err
  62. }
  63. fieldsPerRecord := len(header)
  64. var csvr io.Reader
  65. var csverr error
  66. if strings.HasPrefix(c.CSVLocation, "s3://") {
  67. region := env.GetCSVRegion()
  68. conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
  69. endpoint := env.GetCSVEndpoint()
  70. if endpoint != "" {
  71. conf = conf.WithEndpoint(endpoint)
  72. }
  73. s3Client := s3.New(session.New(conf))
  74. bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
  75. if len(bucketAndKey) == 2 {
  76. out, err := s3Client.GetObject(&s3.GetObjectInput{
  77. Bucket: aws.String(bucketAndKey[0]),
  78. Key: aws.String(bucketAndKey[1]),
  79. })
  80. csverr = err
  81. csvr = out.Body
  82. } else {
  83. c.Pricing = pricing
  84. c.NodeClassPricing = nodeclasspricing
  85. c.NodeClassCount = nodeclasscount
  86. c.PricingPV = pvpricing
  87. c.GPUClassPricing = gpupricing
  88. return fmt.Errorf("Invalid s3 URI: %s", c.CSVLocation)
  89. }
  90. } else {
  91. csvr, csverr = GetCsv(c.CSVLocation)
  92. }
  93. if csverr != nil {
  94. log.Infof("Error reading csv at %s: %s", c.CSVLocation, csverr)
  95. c.Pricing = pricing
  96. c.NodeClassPricing = nodeclasspricing
  97. c.NodeClassCount = nodeclasscount
  98. c.PricingPV = pvpricing
  99. c.GPUClassPricing = gpupricing
  100. return nil
  101. }
  102. csvReader := csv.NewReader(csvr)
  103. csvReader.Comma = ','
  104. csvReader.FieldsPerRecord = fieldsPerRecord
  105. dec, err := csvutil.NewDecoder(csvReader, header...)
  106. if err != nil {
  107. c.Pricing = pricing
  108. c.NodeClassPricing = nodeclasspricing
  109. c.NodeClassCount = nodeclasscount
  110. c.PricingPV = pvpricing
  111. c.GPUClassPricing = gpupricing
  112. return err
  113. }
  114. for {
  115. p := price{}
  116. err := dec.Decode(&p)
  117. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  118. if err == io.EOF {
  119. break
  120. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  121. rec := dec.Record()
  122. if len(rec) != 1 {
  123. log.Infof("Expected %d price info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  124. continue
  125. }
  126. if strings.Index(rec[0], "#") == 0 {
  127. continue
  128. } else {
  129. log.Infof("skipping non-CSV line: %s", rec)
  130. continue
  131. }
  132. } else if err != nil {
  133. log.Infof("Error during spot info decode: %+v", err)
  134. continue
  135. }
  136. log.Infof("Found price info %+v", p)
  137. key := strings.ToLower(p.InstanceID)
  138. if p.Region != "" { // strip the casing from region and add to key.
  139. key = fmt.Sprintf("%s,%s", strings.ToLower(p.Region), strings.ToLower(p.InstanceID))
  140. c.UsesRegion = true
  141. }
  142. if p.AssetClass == "pv" {
  143. pvpricing[key] = &p
  144. c.PVMapField = p.InstanceIDField
  145. } else if p.AssetClass == "node" {
  146. pricing[key] = &p
  147. classKey := p.Region + "," + p.InstanceType + "," + p.AssetClass
  148. cost, err := strconv.ParseFloat(p.MarketPriceHourly, 64)
  149. if err != nil {
  150. } else {
  151. if _, ok := nodeclasspricing[classKey]; ok {
  152. oldPrice := nodeclasspricing[classKey]
  153. oldCount := nodeclasscount[classKey]
  154. newPrice := ((oldPrice * oldCount) + cost) / (oldCount + 1.0)
  155. nodeclasscount[classKey] = newPrice
  156. nodeclasscount[classKey]++
  157. } else {
  158. nodeclasspricing[classKey] = cost
  159. nodeclasscount[classKey] = 1
  160. }
  161. }
  162. c.NodeMapField = p.InstanceIDField
  163. } else if p.AssetClass == "gpu" {
  164. gpupricing[key] = &p
  165. c.GPUMapFields = append(c.GPUMapFields, strings.ToLower(p.InstanceIDField))
  166. } else {
  167. log.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
  168. pricing[key] = &p
  169. c.NodeMapField = p.InstanceIDField
  170. }
  171. }
  172. if len(pricing) > 0 {
  173. c.Pricing = pricing
  174. c.NodeClassPricing = nodeclasspricing
  175. c.NodeClassCount = nodeclasscount
  176. c.PricingPV = pvpricing
  177. c.GPUClassPricing = gpupricing
  178. } else {
  179. log.DedupedWarningf(5, "No data received from csv at %s", c.CSVLocation)
  180. }
  181. return nil
  182. }
  183. type csvKey struct {
  184. Labels map[string]string
  185. ProviderID string
  186. GPULabel []string
  187. GPU int64
  188. }
  189. func (k *csvKey) Features() string {
  190. instanceType, _ := util.GetInstanceType(k.Labels)
  191. region, _ := util.GetRegion(k.Labels)
  192. class := "node"
  193. return region + "," + instanceType + "," + class
  194. }
  195. func (k *csvKey) GPUCount() int {
  196. return int(k.GPU)
  197. }
  198. func (k *csvKey) GPUType() string {
  199. for _, label := range k.GPULabel {
  200. if val, ok := k.Labels[label]; ok {
  201. return val
  202. }
  203. }
  204. return ""
  205. }
  206. func (k *csvKey) ID() string {
  207. return k.ProviderID
  208. }
  209. func (c *CSVProvider) NodePricing(key models.Key) (*models.Node, error) {
  210. c.DownloadPricingDataLock.RLock()
  211. defer c.DownloadPricingDataLock.RUnlock()
  212. var node *models.Node
  213. if p, ok := c.Pricing[key.ID()]; ok {
  214. node = &models.Node{
  215. Cost: p.MarketPriceHourly,
  216. PricingType: models.CsvExact,
  217. }
  218. }
  219. s := strings.Split(key.ID(), ",") // Try without a region to be sure
  220. if len(s) == 2 {
  221. if p, ok := c.Pricing[s[1]]; ok {
  222. node = &models.Node{
  223. Cost: p.MarketPriceHourly,
  224. PricingType: models.CsvExact,
  225. }
  226. }
  227. }
  228. classKey := key.Features() // Use node attributes to try and do a class match
  229. if cost, ok := c.NodeClassPricing[classKey]; ok {
  230. log.Infof("Unable to find provider ID `%s`, using features:`%s`", key.ID(), key.Features())
  231. node = &models.Node{
  232. Cost: fmt.Sprintf("%f", cost),
  233. PricingType: models.CsvClass,
  234. }
  235. }
  236. if node != nil {
  237. if t := key.GPUType(); t != "" {
  238. t = strings.ToLower(t)
  239. count := key.GPUCount()
  240. node.GPU = strconv.Itoa(count)
  241. hourly := 0.0
  242. if p, ok := c.GPUClassPricing[t]; ok {
  243. var err error
  244. hourly, err = strconv.ParseFloat(p.MarketPriceHourly, 64)
  245. if err != nil {
  246. log.Errorf("Unable to parse %s as float", p.MarketPriceHourly)
  247. }
  248. }
  249. totalCost := hourly * float64(count)
  250. node.GPUCost = fmt.Sprintf("%f", totalCost)
  251. nc, err := strconv.ParseFloat(node.Cost, 64)
  252. if err != nil {
  253. log.Errorf("Unable to parse %s as float", node.Cost)
  254. }
  255. node.Cost = fmt.Sprintf("%f", nc+totalCost)
  256. }
  257. return node, nil
  258. } else {
  259. return nil, fmt.Errorf("Unable to find Node matching `%s`:`%s`", key.ID(), key.Features())
  260. }
  261. }
  262. func NodeValueFromMapField(m string, n *v1.Node, useRegion bool) string {
  263. mf := strings.Split(m, ".")
  264. toReturn := ""
  265. if useRegion {
  266. if region, ok := util.GetRegion(n.Labels); ok {
  267. toReturn = region + ","
  268. } else {
  269. log.Errorf("Getting region based on labels failed")
  270. }
  271. }
  272. if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
  273. for matchNum, group := range provIdRx.FindStringSubmatch(n.Spec.ProviderID) {
  274. if matchNum == 2 {
  275. return toReturn + group
  276. }
  277. }
  278. if strings.HasPrefix(n.Spec.ProviderID, "azure://") {
  279. vmOrScaleSet := strings.ToLower(strings.TrimPrefix(n.Spec.ProviderID, "azure://"))
  280. return toReturn + vmOrScaleSet
  281. }
  282. return toReturn + n.Spec.ProviderID
  283. } else if len(mf) > 1 && mf[0] == "metadata" {
  284. if mf[1] == "name" {
  285. return toReturn + n.Name
  286. } else if mf[1] == "labels" {
  287. lkey := strings.Join(mf[2:len(mf)], "")
  288. return toReturn + n.Labels[lkey]
  289. } else if mf[1] == "annotations" {
  290. akey := strings.Join(mf[2:len(mf)], "")
  291. return toReturn + n.Annotations[akey]
  292. } else {
  293. log.Errorf("Unsupported InstanceIDField %s in CSV For Node", m)
  294. return ""
  295. }
  296. } else {
  297. log.Errorf("Unsupported InstanceIDField %s in CSV For Node", m)
  298. return ""
  299. }
  300. }
  301. func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
  302. mf := strings.Split(m, ".")
  303. if len(mf) > 1 && mf[0] == "metadata" {
  304. if mf[1] == "name" {
  305. return n.Name
  306. } else if mf[1] == "labels" {
  307. lkey := strings.Join(mf[2:len(mf)], "")
  308. return n.Labels[lkey]
  309. } else if mf[1] == "annotations" {
  310. akey := strings.Join(mf[2:len(mf)], "")
  311. return n.Annotations[akey]
  312. } else {
  313. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  314. return ""
  315. }
  316. } else if len(mf) > 2 && mf[0] == "spec" {
  317. if mf[1] == "capacity" && mf[2] == "storage" {
  318. skey := n.Spec.Capacity["storage"]
  319. return skey.String()
  320. } else {
  321. log.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  322. return ""
  323. }
  324. } else {
  325. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  326. return ""
  327. }
  328. }
  329. func (c *CSVProvider) GetKey(l map[string]string, n *v1.Node) models.Key {
  330. id := NodeValueFromMapField(c.NodeMapField, n, c.UsesRegion)
  331. var gpuCount int64
  332. gpuCount = 0
  333. if gpuc, ok := n.Status.Capacity["nvidia.com/gpu"]; ok { // TODO: support non-nvidia GPUs
  334. gpuCount = gpuc.Value()
  335. }
  336. return &csvKey{
  337. ProviderID: id,
  338. Labels: l,
  339. GPULabel: c.GPUMapFields,
  340. GPU: gpuCount,
  341. }
  342. }
  343. type csvPVKey struct {
  344. Labels map[string]string
  345. ProviderID string
  346. StorageClassName string
  347. StorageClassParameters map[string]string
  348. Name string
  349. DefaultRegion string
  350. }
  351. func (key *csvPVKey) ID() string {
  352. return ""
  353. }
  354. func (key *csvPVKey) GetStorageClass() string {
  355. return key.StorageClassName
  356. }
  357. func (key *csvPVKey) Features() string {
  358. return key.ProviderID
  359. }
  360. func (c *CSVProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) models.PVKey {
  361. id := PVValueFromMapField(c.PVMapField, pv)
  362. return &csvPVKey{
  363. Labels: pv.Labels,
  364. ProviderID: id,
  365. StorageClassName: pv.Spec.StorageClassName,
  366. StorageClassParameters: parameters,
  367. Name: pv.Name,
  368. DefaultRegion: defaultRegion,
  369. }
  370. }
  371. func (c *CSVProvider) PVPricing(pvk models.PVKey) (*models.PV, error) {
  372. c.DownloadPricingDataLock.RLock()
  373. defer c.DownloadPricingDataLock.RUnlock()
  374. pricing, ok := c.PricingPV[pvk.Features()]
  375. if !ok {
  376. log.Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  377. return &models.PV{}, nil
  378. }
  379. return &models.PV{
  380. Cost: pricing.MarketPriceHourly,
  381. }, nil
  382. }
  383. func (c *CSVProvider) ServiceAccountStatus() *models.ServiceAccountStatus {
  384. return &models.ServiceAccountStatus{
  385. Checks: []*models.ServiceAccountCheck{},
  386. }
  387. }
  388. func (*CSVProvider) ClusterManagementPricing() (string, float64, error) {
  389. return "", 0.0, nil
  390. }
  391. func (c *CSVProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  392. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  393. }
  394. func (c *CSVProvider) Regions() []string {
  395. return []string{}
  396. }
  397. func (c *CSVProvider) PricingSourceSummary() interface{} {
  398. return c.Pricing
  399. }