csvprovider.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. package cloud
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/opencost/opencost/pkg/env"
  12. "github.com/opencost/opencost/pkg/util"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/session"
  15. "github.com/aws/aws-sdk-go/service/s3"
  16. "github.com/opencost/opencost/pkg/log"
  17. v1 "k8s.io/api/core/v1"
  18. "github.com/jszwec/csvutil"
  19. )
  20. const refreshMinutes = 60
  21. type CSVProvider struct {
  22. *CustomProvider
  23. CSVLocation string
  24. Pricing map[string]*price
  25. NodeClassPricing map[string]float64
  26. NodeClassCount map[string]float64
  27. NodeMapField string
  28. PricingPV map[string]*price
  29. PVMapField string
  30. GPUClassPricing map[string]*price
  31. GPUMapFields []string // Fields in a node's labels that represent the GPU class.
  32. UsesRegion bool
  33. DownloadPricingDataLock sync.RWMutex
  34. }
  35. type price struct {
  36. EndTimestamp string `csv:"EndTimestamp"`
  37. InstanceID string `csv:"InstanceID"`
  38. Region string `csv:"Region"`
  39. AssetClass string `csv:"AssetClass"`
  40. InstanceIDField string `csv:"InstanceIDField"`
  41. InstanceType string `csv:"InstanceType"`
  42. MarketPriceHourly string `csv:"MarketPriceHourly"`
  43. Version string `csv:"Version"`
  44. }
  45. func GetCsv(location string) (io.Reader, error) {
  46. return os.Open(location)
  47. }
  48. func (c *CSVProvider) DownloadPricingData() error {
  49. c.DownloadPricingDataLock.Lock()
  50. defer time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
  51. defer c.DownloadPricingDataLock.Unlock()
  52. pricing := make(map[string]*price)
  53. nodeclasspricing := make(map[string]float64)
  54. nodeclasscount := make(map[string]float64)
  55. pvpricing := make(map[string]*price)
  56. gpupricing := make(map[string]*price)
  57. c.GPUMapFields = make([]string, 0, 1)
  58. header, err := csvutil.Header(price{}, "csv")
  59. if err != nil {
  60. return err
  61. }
  62. fieldsPerRecord := len(header)
  63. var csvr io.Reader
  64. var csverr error
  65. if strings.HasPrefix(c.CSVLocation, "s3://") {
  66. region := env.GetCSVRegion()
  67. conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
  68. endpoint := env.GetCSVEndpoint()
  69. if endpoint != "" {
  70. conf = conf.WithEndpoint(endpoint)
  71. }
  72. s3Client := s3.New(session.New(conf))
  73. bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
  74. if len(bucketAndKey) == 2 {
  75. out, err := s3Client.GetObject(&s3.GetObjectInput{
  76. Bucket: aws.String(bucketAndKey[0]),
  77. Key: aws.String(bucketAndKey[1]),
  78. })
  79. csverr = err
  80. csvr = out.Body
  81. } else {
  82. c.Pricing = pricing
  83. c.NodeClassPricing = nodeclasspricing
  84. c.NodeClassCount = nodeclasscount
  85. c.PricingPV = pvpricing
  86. c.GPUClassPricing = gpupricing
  87. return fmt.Errorf("Invalid s3 URI: %s", c.CSVLocation)
  88. }
  89. } else {
  90. csvr, csverr = GetCsv(c.CSVLocation)
  91. }
  92. if csverr != nil {
  93. log.Infof("Error reading csv at %s: %s", c.CSVLocation, csverr)
  94. c.Pricing = pricing
  95. c.NodeClassPricing = nodeclasspricing
  96. c.NodeClassCount = nodeclasscount
  97. c.PricingPV = pvpricing
  98. c.GPUClassPricing = gpupricing
  99. return nil
  100. }
  101. csvReader := csv.NewReader(csvr)
  102. csvReader.Comma = ','
  103. csvReader.FieldsPerRecord = fieldsPerRecord
  104. dec, err := csvutil.NewDecoder(csvReader, header...)
  105. if err != nil {
  106. c.Pricing = pricing
  107. c.NodeClassPricing = nodeclasspricing
  108. c.NodeClassCount = nodeclasscount
  109. c.PricingPV = pvpricing
  110. c.GPUClassPricing = gpupricing
  111. return err
  112. }
  113. for {
  114. p := price{}
  115. err := dec.Decode(&p)
  116. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  117. if err == io.EOF {
  118. break
  119. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  120. rec := dec.Record()
  121. if len(rec) != 1 {
  122. log.Infof("Expected %d price info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  123. continue
  124. }
  125. if strings.Index(rec[0], "#") == 0 {
  126. continue
  127. } else {
  128. log.Infof("skipping non-CSV line: %s", rec)
  129. continue
  130. }
  131. } else if err != nil {
  132. log.Infof("Error during spot info decode: %+v", err)
  133. continue
  134. }
  135. log.Infof("Found price info %+v", p)
  136. key := strings.ToLower(p.InstanceID)
  137. if p.Region != "" { // strip the casing from region and add to key.
  138. key = fmt.Sprintf("%s,%s", strings.ToLower(p.Region), strings.ToLower(p.InstanceID))
  139. c.UsesRegion = true
  140. }
  141. if p.AssetClass == "pv" {
  142. pvpricing[key] = &p
  143. c.PVMapField = p.InstanceIDField
  144. } else if p.AssetClass == "node" {
  145. pricing[key] = &p
  146. classKey := p.Region + "," + p.InstanceType + "," + p.AssetClass
  147. cost, err := strconv.ParseFloat(p.MarketPriceHourly, 64)
  148. if err != nil {
  149. } else {
  150. if _, ok := nodeclasspricing[classKey]; ok {
  151. oldPrice := nodeclasspricing[classKey]
  152. oldCount := nodeclasscount[classKey]
  153. newPrice := ((oldPrice * oldCount) + cost) / (oldCount + 1.0)
  154. nodeclasscount[classKey] = newPrice
  155. nodeclasscount[classKey]++
  156. } else {
  157. nodeclasspricing[classKey] = cost
  158. nodeclasscount[classKey] = 1
  159. }
  160. }
  161. c.NodeMapField = p.InstanceIDField
  162. } else if p.AssetClass == "gpu" {
  163. gpupricing[key] = &p
  164. c.GPUMapFields = append(c.GPUMapFields, strings.ToLower(p.InstanceIDField))
  165. } else {
  166. log.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
  167. pricing[key] = &p
  168. c.NodeMapField = p.InstanceIDField
  169. }
  170. }
  171. if len(pricing) > 0 {
  172. c.Pricing = pricing
  173. c.NodeClassPricing = nodeclasspricing
  174. c.NodeClassCount = nodeclasscount
  175. c.PricingPV = pvpricing
  176. c.GPUClassPricing = gpupricing
  177. } else {
  178. log.DedupedWarningf(5, "No data received from csv at %s", c.CSVLocation)
  179. }
  180. return nil
  181. }
  182. type csvKey struct {
  183. Labels map[string]string
  184. ProviderID string
  185. GPULabel []string
  186. GPU int64
  187. }
  188. func (k *csvKey) Features() string {
  189. instanceType, _ := util.GetInstanceType(k.Labels)
  190. region, _ := util.GetRegion(k.Labels)
  191. class := "node"
  192. return region + "," + instanceType + "," + class
  193. }
  194. func (k *csvKey) GPUCount() int {
  195. return int(k.GPU)
  196. }
  197. func (k *csvKey) GPUType() string {
  198. for _, label := range k.GPULabel {
  199. if val, ok := k.Labels[label]; ok {
  200. return val
  201. }
  202. }
  203. return ""
  204. }
  205. func (k *csvKey) ID() string {
  206. return k.ProviderID
  207. }
  208. func (c *CSVProvider) NodePricing(key Key) (*Node, error) {
  209. c.DownloadPricingDataLock.RLock()
  210. defer c.DownloadPricingDataLock.RUnlock()
  211. var node *Node
  212. if p, ok := c.Pricing[key.ID()]; ok {
  213. node = &Node{
  214. Cost: p.MarketPriceHourly,
  215. PricingType: CsvExact,
  216. }
  217. }
  218. s := strings.Split(key.ID(), ",") // Try without a region to be sure
  219. if len(s) == 2 {
  220. if p, ok := c.Pricing[s[1]]; ok {
  221. node = &Node{
  222. Cost: p.MarketPriceHourly,
  223. PricingType: CsvExact,
  224. }
  225. }
  226. }
  227. classKey := key.Features() // Use node attributes to try and do a class match
  228. if cost, ok := c.NodeClassPricing[classKey]; ok {
  229. log.Infof("Unable to find provider ID `%s`, using features:`%s`", key.ID(), key.Features())
  230. node = &Node{
  231. Cost: fmt.Sprintf("%f", cost),
  232. PricingType: CsvClass,
  233. }
  234. }
  235. if node != nil {
  236. if t := key.GPUType(); t != "" {
  237. t = strings.ToLower(t)
  238. count := key.GPUCount()
  239. node.GPU = strconv.Itoa(count)
  240. hourly := 0.0
  241. if p, ok := c.GPUClassPricing[t]; ok {
  242. var err error
  243. hourly, err = strconv.ParseFloat(p.MarketPriceHourly, 64)
  244. if err != nil {
  245. log.Errorf("Unable to parse %s as float", p.MarketPriceHourly)
  246. }
  247. }
  248. totalCost := hourly * float64(count)
  249. node.GPUCost = fmt.Sprintf("%f", totalCost)
  250. nc, err := strconv.ParseFloat(node.Cost, 64)
  251. if err != nil {
  252. log.Errorf("Unable to parse %s as float", node.Cost)
  253. }
  254. node.Cost = fmt.Sprintf("%f", nc+totalCost)
  255. }
  256. return node, nil
  257. } else {
  258. return nil, fmt.Errorf("Unable to find Node matching `%s`:`%s`", key.ID(), key.Features())
  259. }
  260. }
  261. func NodeValueFromMapField(m string, n *v1.Node, useRegion bool) string {
  262. mf := strings.Split(m, ".")
  263. toReturn := ""
  264. if useRegion {
  265. if region, ok := util.GetRegion(n.Labels); ok {
  266. toReturn = region + ","
  267. } else {
  268. log.Errorf("Getting region based on labels failed")
  269. }
  270. }
  271. if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
  272. for matchNum, group := range provIdRx.FindStringSubmatch(n.Spec.ProviderID) {
  273. if matchNum == 2 {
  274. return toReturn + group
  275. }
  276. }
  277. if strings.HasPrefix(n.Spec.ProviderID, "azure://") {
  278. vmOrScaleSet := strings.ToLower(strings.TrimPrefix(n.Spec.ProviderID, "azure://"))
  279. return toReturn + vmOrScaleSet
  280. }
  281. return toReturn + n.Spec.ProviderID
  282. } else if len(mf) > 1 && mf[0] == "metadata" {
  283. if mf[1] == "name" {
  284. return toReturn + n.Name
  285. } else if mf[1] == "labels" {
  286. lkey := strings.Join(mf[2:len(mf)], "")
  287. return toReturn + n.Labels[lkey]
  288. } else if mf[1] == "annotations" {
  289. akey := strings.Join(mf[2:len(mf)], "")
  290. return toReturn + n.Annotations[akey]
  291. } else {
  292. log.Errorf("Unsupported InstanceIDField %s in CSV For Node", m)
  293. return ""
  294. }
  295. } else {
  296. log.Errorf("Unsupported InstanceIDField %s in CSV For Node", m)
  297. return ""
  298. }
  299. }
  300. func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
  301. mf := strings.Split(m, ".")
  302. if len(mf) > 1 && mf[0] == "metadata" {
  303. if mf[1] == "name" {
  304. return n.Name
  305. } else if mf[1] == "labels" {
  306. lkey := strings.Join(mf[2:len(mf)], "")
  307. return n.Labels[lkey]
  308. } else if mf[1] == "annotations" {
  309. akey := strings.Join(mf[2:len(mf)], "")
  310. return n.Annotations[akey]
  311. } else {
  312. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  313. return ""
  314. }
  315. } else if len(mf) > 2 && mf[0] == "spec" {
  316. if mf[1] == "capacity" && mf[2] == "storage" {
  317. skey := n.Spec.Capacity["storage"]
  318. return skey.String()
  319. } else {
  320. log.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  321. return ""
  322. }
  323. } else {
  324. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  325. return ""
  326. }
  327. }
  328. func (c *CSVProvider) GetKey(l map[string]string, n *v1.Node) Key {
  329. id := NodeValueFromMapField(c.NodeMapField, n, c.UsesRegion)
  330. var gpuCount int64
  331. gpuCount = 0
  332. if gpuc, ok := n.Status.Capacity["nvidia.com/gpu"]; ok { // TODO: support non-nvidia GPUs
  333. gpuCount = gpuc.Value()
  334. }
  335. return &csvKey{
  336. ProviderID: id,
  337. Labels: l,
  338. GPULabel: c.GPUMapFields,
  339. GPU: gpuCount,
  340. }
  341. }
  342. type csvPVKey struct {
  343. Labels map[string]string
  344. ProviderID string
  345. StorageClassName string
  346. StorageClassParameters map[string]string
  347. Name string
  348. DefaultRegion string
  349. }
  350. func (key *csvPVKey) ID() string {
  351. return ""
  352. }
  353. func (key *csvPVKey) GetStorageClass() string {
  354. return key.StorageClassName
  355. }
  356. func (key *csvPVKey) Features() string {
  357. return key.ProviderID
  358. }
  359. func (c *CSVProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  360. id := PVValueFromMapField(c.PVMapField, pv)
  361. return &csvPVKey{
  362. Labels: pv.Labels,
  363. ProviderID: id,
  364. StorageClassName: pv.Spec.StorageClassName,
  365. StorageClassParameters: parameters,
  366. Name: pv.Name,
  367. DefaultRegion: defaultRegion,
  368. }
  369. }
  370. func (c *CSVProvider) PVPricing(pvk PVKey) (*PV, error) {
  371. c.DownloadPricingDataLock.RLock()
  372. defer c.DownloadPricingDataLock.RUnlock()
  373. pricing, ok := c.PricingPV[pvk.Features()]
  374. if !ok {
  375. log.Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  376. return &PV{}, nil
  377. }
  378. return &PV{
  379. Cost: pricing.MarketPriceHourly,
  380. }, nil
  381. }
  382. func (c *CSVProvider) ServiceAccountStatus() *ServiceAccountStatus {
  383. return &ServiceAccountStatus{
  384. Checks: []*ServiceAccountCheck{},
  385. }
  386. }
  387. func (*CSVProvider) ClusterManagementPricing() (string, float64, error) {
  388. return "", 0.0, nil
  389. }
  390. func (c *CSVProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  391. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  392. }
  393. func (c *CSVProvider) Regions() []string {
  394. return []string{}
  395. }
  396. func (c *CSVProvider) PricingSourceSummary() interface{} {
  397. return c.Pricing
  398. }