csvprovider.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. package cloud
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/opencost/opencost/pkg/env"
  12. "github.com/opencost/opencost/pkg/util"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/session"
  15. "github.com/aws/aws-sdk-go/service/s3"
  16. "github.com/opencost/opencost/pkg/log"
  17. v1 "k8s.io/api/core/v1"
  18. "github.com/jszwec/csvutil"
  19. )
  20. const refreshMinutes = 60
  21. type CSVProvider struct {
  22. *CustomProvider
  23. CSVLocation string
  24. Pricing map[string]*price
  25. NodeClassPricing map[string]float64
  26. NodeClassCount map[string]float64
  27. NodeMapField string
  28. PricingPV map[string]*price
  29. PVMapField string
  30. GPUClassPricing map[string]*price
  31. GPUMapFields []string
  32. UsesRegion bool
  33. DownloadPricingDataLock sync.RWMutex
  34. }
  35. type price struct {
  36. EndTimestamp string `csv:"EndTimestamp"`
  37. InstanceID string `csv:"InstanceID"`
  38. Region string `csv:"Region"`
  39. AssetClass string `csv:"AssetClass"`
  40. InstanceIDField string `csv:"InstanceIDField"`
  41. InstanceType string `csv:"InstanceType"`
  42. MarketPriceHourly string `csv:"MarketPriceHourly"`
  43. Version string `csv:"Version"`
  44. }
  45. func GetCsv(location string) (io.Reader, error) {
  46. return os.Open(location)
  47. }
  48. func (c *CSVProvider) DownloadPricingData() error {
  49. c.DownloadPricingDataLock.Lock()
  50. defer time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
  51. defer c.DownloadPricingDataLock.Unlock()
  52. pricing := make(map[string]*price)
  53. nodeclasspricing := make(map[string]float64)
  54. nodeclasscount := make(map[string]float64)
  55. pvpricing := make(map[string]*price)
  56. gpupricing := make(map[string]*price)
  57. c.GPUMapFields = make([]string, 0, 1)
  58. header, err := csvutil.Header(price{}, "csv")
  59. if err != nil {
  60. return err
  61. }
  62. fieldsPerRecord := len(header)
  63. var csvr io.Reader
  64. var csverr error
  65. if strings.HasPrefix(c.CSVLocation, "s3://") {
  66. region := env.GetCSVRegion()
  67. conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
  68. endpoint := env.GetCSVEndpoint()
  69. if endpoint != "" {
  70. conf = conf.WithEndpoint(endpoint)
  71. }
  72. s3Client := s3.New(session.New(conf))
  73. bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
  74. if len(bucketAndKey) == 2 {
  75. out, err := s3Client.GetObject(&s3.GetObjectInput{
  76. Bucket: aws.String(bucketAndKey[0]),
  77. Key: aws.String(bucketAndKey[1]),
  78. })
  79. csverr = err
  80. csvr = out.Body
  81. } else {
  82. c.Pricing = pricing
  83. c.NodeClassPricing = nodeclasspricing
  84. c.NodeClassCount = nodeclasscount
  85. c.PricingPV = pvpricing
  86. c.GPUClassPricing = gpupricing
  87. return fmt.Errorf("Invalid s3 URI: %s", c.CSVLocation)
  88. }
  89. } else {
  90. csvr, csverr = GetCsv(c.CSVLocation)
  91. }
  92. if csverr != nil {
  93. log.Infof("Error reading csv at %s: %s", c.CSVLocation, csverr)
  94. c.Pricing = pricing
  95. c.NodeClassPricing = nodeclasspricing
  96. c.NodeClassCount = nodeclasscount
  97. c.PricingPV = pvpricing
  98. c.GPUClassPricing = gpupricing
  99. return nil
  100. }
  101. csvReader := csv.NewReader(csvr)
  102. csvReader.Comma = ','
  103. csvReader.FieldsPerRecord = fieldsPerRecord
  104. dec, err := csvutil.NewDecoder(csvReader, header...)
  105. if err != nil {
  106. c.Pricing = pricing
  107. c.NodeClassPricing = nodeclasspricing
  108. c.NodeClassCount = nodeclasscount
  109. c.PricingPV = pvpricing
  110. c.GPUClassPricing = gpupricing
  111. return err
  112. }
  113. for {
  114. p := price{}
  115. err := dec.Decode(&p)
  116. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  117. if err == io.EOF {
  118. break
  119. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  120. rec := dec.Record()
  121. if len(rec) != 1 {
  122. log.Infof("Expected %d price info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  123. continue
  124. }
  125. if strings.Index(rec[0], "#") == 0 {
  126. continue
  127. } else {
  128. log.Infof("skipping non-CSV line: %s", rec)
  129. continue
  130. }
  131. } else if err != nil {
  132. log.Infof("Error during spot info decode: %+v", err)
  133. continue
  134. }
  135. log.Infof("Found price info %+v", p)
  136. key := strings.ToLower(p.InstanceID)
  137. if p.Region != "" { // strip the casing from region and add to key.
  138. key = fmt.Sprintf("%s,%s", strings.ToLower(p.Region), strings.ToLower(p.InstanceID))
  139. c.UsesRegion = true
  140. }
  141. if p.AssetClass == "pv" {
  142. pvpricing[key] = &p
  143. c.PVMapField = p.InstanceIDField
  144. } else if p.AssetClass == "node" {
  145. pricing[key] = &p
  146. classKey := p.Region + "," + p.InstanceType + "," + p.AssetClass
  147. cost, err := strconv.ParseFloat(p.MarketPriceHourly, 64)
  148. if err != nil {
  149. } else {
  150. if _, ok := nodeclasspricing[classKey]; ok {
  151. oldPrice := nodeclasspricing[classKey]
  152. oldCount := nodeclasscount[classKey]
  153. newPrice := ((oldPrice * oldCount) + cost) / (oldCount + 1.0)
  154. nodeclasscount[classKey] = newPrice
  155. nodeclasscount[classKey]++
  156. } else {
  157. nodeclasspricing[classKey] = cost
  158. nodeclasscount[classKey] = 1
  159. }
  160. }
  161. c.NodeMapField = p.InstanceIDField
  162. } else if p.AssetClass == "gpu" {
  163. gpupricing[key] = &p
  164. c.GPUMapFields = append(c.GPUMapFields, strings.ToLower(p.InstanceIDField))
  165. } else {
  166. log.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
  167. pricing[key] = &p
  168. c.NodeMapField = p.InstanceIDField
  169. }
  170. }
  171. if len(pricing) > 0 {
  172. c.Pricing = pricing
  173. c.NodeClassPricing = nodeclasspricing
  174. c.NodeClassCount = nodeclasscount
  175. c.PricingPV = pvpricing
  176. c.GPUClassPricing = gpupricing
  177. } else {
  178. log.DedupedWarningf(5, "No data received from csv at %s", c.CSVLocation)
  179. }
  180. return nil
  181. }
  182. type csvKey struct {
  183. Labels map[string]string
  184. ProviderID string
  185. GPULabel []string
  186. GPU int64
  187. }
  188. func (k *csvKey) Features() string {
  189. instanceType, _ := util.GetInstanceType(k.Labels)
  190. region, _ := util.GetRegion(k.Labels)
  191. class := "node"
  192. return region + "," + instanceType + "," + class
  193. }
  194. func (k *csvKey) GPUCount() int {
  195. return int(k.GPU)
  196. }
  197. func (k *csvKey) GPUType() string {
  198. for _, label := range k.GPULabel {
  199. if val, ok := k.Labels[label]; ok {
  200. return val
  201. }
  202. }
  203. return ""
  204. }
  205. func (k *csvKey) ID() string {
  206. return k.ProviderID
  207. }
  208. func (c *CSVProvider) NodePricing(key Key) (*Node, error) {
  209. c.DownloadPricingDataLock.RLock()
  210. defer c.DownloadPricingDataLock.RUnlock()
  211. var node *Node
  212. if p, ok := c.Pricing[key.ID()]; ok {
  213. node = &Node{
  214. Cost: p.MarketPriceHourly,
  215. PricingType: CsvExact,
  216. }
  217. }
  218. s := strings.Split(key.ID(), ",") // Try without a region to be sure
  219. if len(s) == 2 {
  220. if p, ok := c.Pricing[s[1]]; ok {
  221. node = &Node{
  222. Cost: p.MarketPriceHourly,
  223. PricingType: CsvExact,
  224. }
  225. }
  226. }
  227. classKey := key.Features() // Use node attributes to try and do a class match
  228. if cost, ok := c.NodeClassPricing[classKey]; ok {
  229. log.Infof("Unable to find provider ID `%s`, using features:`%s`", key.ID(), key.Features())
  230. node = &Node{
  231. Cost: fmt.Sprintf("%f", cost),
  232. PricingType: CsvClass,
  233. }
  234. }
  235. if node != nil {
  236. if t := key.GPUType(); t != "" {
  237. t = strings.ToLower(t)
  238. count := key.GPUCount()
  239. node.GPU = strconv.Itoa(count)
  240. hourly := 0.0
  241. if p, ok := c.GPUClassPricing[t]; ok {
  242. hourly, _ = strconv.ParseFloat(p.MarketPriceHourly, 64)
  243. }
  244. totalCost := hourly * float64(count)
  245. node.GPUCost = fmt.Sprintf("%f", totalCost)
  246. nc, _ := strconv.ParseFloat(node.Cost, 64)
  247. node.Cost = fmt.Sprintf("%f", nc+totalCost)
  248. }
  249. return node, nil
  250. } else {
  251. return nil, fmt.Errorf("Unable to find Node matching `%s`:`%s`", key.ID(), key.Features())
  252. }
  253. }
  254. func NodeValueFromMapField(m string, n *v1.Node, useRegion bool) string {
  255. mf := strings.Split(m, ".")
  256. toReturn := ""
  257. if useRegion {
  258. if region, ok := util.GetRegion(n.Labels); ok {
  259. toReturn = region + ","
  260. } else {
  261. log.Errorf("Getting region based on labels failed")
  262. }
  263. }
  264. if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
  265. for matchNum, group := range provIdRx.FindStringSubmatch(n.Spec.ProviderID) {
  266. if matchNum == 2 {
  267. return toReturn + group
  268. }
  269. }
  270. if strings.HasPrefix(n.Spec.ProviderID, "azure://") {
  271. vmOrScaleSet := strings.ToLower(strings.TrimPrefix(n.Spec.ProviderID, "azure://"))
  272. return toReturn + vmOrScaleSet
  273. }
  274. return toReturn + n.Spec.ProviderID
  275. } else if len(mf) > 1 && mf[0] == "metadata" {
  276. if mf[1] == "name" {
  277. return toReturn + n.Name
  278. } else if mf[1] == "labels" {
  279. lkey := strings.Join(mf[2:len(mf)], "")
  280. return toReturn + n.Labels[lkey]
  281. } else if mf[1] == "annotations" {
  282. akey := strings.Join(mf[2:len(mf)], "")
  283. return toReturn + n.Annotations[akey]
  284. } else {
  285. log.Errorf("Unsupported InstanceIDField %s in CSV For Node", m)
  286. return ""
  287. }
  288. } else {
  289. log.Errorf("Unsupported InstanceIDField %s in CSV For Node", m)
  290. return ""
  291. }
  292. }
  293. func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
  294. mf := strings.Split(m, ".")
  295. if len(mf) > 1 && mf[0] == "metadata" {
  296. if mf[1] == "name" {
  297. return n.Name
  298. } else if mf[1] == "labels" {
  299. lkey := strings.Join(mf[2:len(mf)], "")
  300. return n.Labels[lkey]
  301. } else if mf[1] == "annotations" {
  302. akey := strings.Join(mf[2:len(mf)], "")
  303. return n.Annotations[akey]
  304. } else {
  305. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  306. return ""
  307. }
  308. } else if len(mf) > 2 && mf[0] == "spec" {
  309. if mf[1] == "capacity" && mf[2] == "storage" {
  310. skey := n.Spec.Capacity["storage"]
  311. return skey.String()
  312. } else {
  313. log.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  314. return ""
  315. }
  316. } else {
  317. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  318. return ""
  319. }
  320. }
  321. func (c *CSVProvider) GetKey(l map[string]string, n *v1.Node) Key {
  322. id := NodeValueFromMapField(c.NodeMapField, n, c.UsesRegion)
  323. var gpuCount int64
  324. gpuCount = 0
  325. if gpuc, ok := n.Status.Capacity["nvidia.com/gpu"]; ok { // TODO: support non-nvidia GPUs
  326. gpuCount = gpuc.Value()
  327. }
  328. return &csvKey{
  329. ProviderID: id,
  330. Labels: l,
  331. GPULabel: c.GPUMapFields,
  332. GPU: gpuCount,
  333. }
  334. }
  335. type csvPVKey struct {
  336. Labels map[string]string
  337. ProviderID string
  338. StorageClassName string
  339. StorageClassParameters map[string]string
  340. Name string
  341. DefaultRegion string
  342. }
  343. func (key *csvPVKey) ID() string {
  344. return ""
  345. }
  346. func (key *csvPVKey) GetStorageClass() string {
  347. return key.StorageClassName
  348. }
  349. func (key *csvPVKey) Features() string {
  350. return key.ProviderID
  351. }
  352. func (c *CSVProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  353. id := PVValueFromMapField(c.PVMapField, pv)
  354. return &csvPVKey{
  355. Labels: pv.Labels,
  356. ProviderID: id,
  357. StorageClassName: pv.Spec.StorageClassName,
  358. StorageClassParameters: parameters,
  359. Name: pv.Name,
  360. DefaultRegion: defaultRegion,
  361. }
  362. }
  363. func (c *CSVProvider) PVPricing(pvk PVKey) (*PV, error) {
  364. c.DownloadPricingDataLock.RLock()
  365. defer c.DownloadPricingDataLock.RUnlock()
  366. pricing, ok := c.PricingPV[pvk.Features()]
  367. if !ok {
  368. log.Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  369. return &PV{}, nil
  370. }
  371. return &PV{
  372. Cost: pricing.MarketPriceHourly,
  373. }, nil
  374. }
  375. func (c *CSVProvider) ServiceAccountStatus() *ServiceAccountStatus {
  376. return &ServiceAccountStatus{
  377. Checks: []*ServiceAccountCheck{},
  378. }
  379. }
  380. func (*CSVProvider) ClusterManagementPricing() (string, float64, error) {
  381. return "", 0.0, nil
  382. }
  383. func (c *CSVProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  384. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  385. }
  386. func (c *CSVProvider) Regions() []string {
  387. return []string{}
  388. }