csvprovider.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. package cloud
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/opencost/opencost/pkg/env"
  12. "github.com/opencost/opencost/pkg/util"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/session"
  15. "github.com/aws/aws-sdk-go/service/s3"
  16. "github.com/opencost/opencost/pkg/log"
  17. v1 "k8s.io/api/core/v1"
  18. "github.com/jszwec/csvutil"
  19. )
  20. const refreshMinutes = 60
  21. type CSVProvider struct {
  22. *CustomProvider
  23. CSVLocation string
  24. Pricing map[string]*price
  25. NodeClassPricing map[string]float64
  26. NodeClassCount map[string]float64
  27. NodeMapField string
  28. PricingPV map[string]*price
  29. PVMapField string
  30. GPUClassPricing map[string]*price
  31. GPUMapField string
  32. UsesRegion bool
  33. DownloadPricingDataLock sync.RWMutex
  34. }
  35. type price struct {
  36. EndTimestamp string `csv:"EndTimestamp"`
  37. InstanceID string `csv:"InstanceID"`
  38. Region string `csv:"Region"`
  39. AssetClass string `csv:"AssetClass"`
  40. InstanceIDField string `csv:"InstanceIDField"`
  41. InstanceType string `csv:"InstanceType"`
  42. MarketPriceHourly string `csv:"MarketPriceHourly"`
  43. Version string `csv:"Version"`
  44. }
  45. func GetCsv(location string) (io.Reader, error) {
  46. return os.Open(location)
  47. }
  48. func (c *CSVProvider) DownloadPricingData() error {
  49. c.DownloadPricingDataLock.Lock()
  50. defer time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
  51. defer c.DownloadPricingDataLock.Unlock()
  52. pricing := make(map[string]*price)
  53. nodeclasspricing := make(map[string]float64)
  54. nodeclasscount := make(map[string]float64)
  55. pvpricing := make(map[string]*price)
  56. gpupricing := make(map[string]*price)
  57. header, err := csvutil.Header(price{}, "csv")
  58. if err != nil {
  59. return err
  60. }
  61. fieldsPerRecord := len(header)
  62. var csvr io.Reader
  63. var csverr error
  64. if strings.HasPrefix(c.CSVLocation, "s3://") {
  65. region := env.GetCSVRegion()
  66. conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
  67. endpoint := env.GetCSVEndpoint()
  68. if endpoint != "" {
  69. conf = conf.WithEndpoint(endpoint)
  70. }
  71. s3Client := s3.New(session.New(conf))
  72. bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
  73. if len(bucketAndKey) == 2 {
  74. out, err := s3Client.GetObject(&s3.GetObjectInput{
  75. Bucket: aws.String(bucketAndKey[0]),
  76. Key: aws.String(bucketAndKey[1]),
  77. })
  78. csverr = err
  79. csvr = out.Body
  80. } else {
  81. c.Pricing = pricing
  82. c.NodeClassPricing = nodeclasspricing
  83. c.NodeClassCount = nodeclasscount
  84. c.PricingPV = pvpricing
  85. c.GPUClassPricing = gpupricing
  86. return fmt.Errorf("Invalid s3 URI: %s", c.CSVLocation)
  87. }
  88. } else {
  89. csvr, csverr = GetCsv(c.CSVLocation)
  90. }
  91. if csverr != nil {
  92. log.Infof("Error reading csv at %s: %s", c.CSVLocation, csverr)
  93. c.Pricing = pricing
  94. c.NodeClassPricing = nodeclasspricing
  95. c.NodeClassCount = nodeclasscount
  96. c.PricingPV = pvpricing
  97. c.GPUClassPricing = gpupricing
  98. return nil
  99. }
  100. csvReader := csv.NewReader(csvr)
  101. csvReader.Comma = ','
  102. csvReader.FieldsPerRecord = fieldsPerRecord
  103. dec, err := csvutil.NewDecoder(csvReader, header...)
  104. if err != nil {
  105. c.Pricing = pricing
  106. c.NodeClassPricing = nodeclasspricing
  107. c.NodeClassCount = nodeclasscount
  108. c.PricingPV = pvpricing
  109. c.GPUClassPricing = gpupricing
  110. return err
  111. }
  112. for {
  113. p := price{}
  114. err := dec.Decode(&p)
  115. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  116. if err == io.EOF {
  117. break
  118. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  119. rec := dec.Record()
  120. if len(rec) != 1 {
  121. log.Infof("Expected %d price info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  122. continue
  123. }
  124. if strings.Index(rec[0], "#") == 0 {
  125. continue
  126. } else {
  127. log.Infof("skipping non-CSV line: %s", rec)
  128. continue
  129. }
  130. } else if err != nil {
  131. log.Infof("Error during spot info decode: %+v", err)
  132. continue
  133. }
  134. log.Infof("Found price info %+v", p)
  135. key := strings.ToLower(p.InstanceID)
  136. if p.Region != "" { // strip the casing from region and add to key.
  137. key = fmt.Sprintf("%s,%s", strings.ToLower(p.Region), strings.ToLower(p.InstanceID))
  138. c.UsesRegion = true
  139. }
  140. if p.AssetClass == "pv" {
  141. pvpricing[key] = &p
  142. c.PVMapField = p.InstanceIDField
  143. } else if p.AssetClass == "node" {
  144. pricing[key] = &p
  145. classKey := p.Region + "," + p.InstanceType + "," + p.AssetClass
  146. cost, err := strconv.ParseFloat(p.MarketPriceHourly, 64)
  147. if err != nil {
  148. } else {
  149. if _, ok := nodeclasspricing[classKey]; ok {
  150. oldPrice := nodeclasspricing[classKey]
  151. oldCount := nodeclasscount[classKey]
  152. newPrice := ((oldPrice * oldCount) + cost) / (oldCount + 1.0)
  153. nodeclasscount[classKey] = newPrice
  154. nodeclasscount[classKey]++
  155. } else {
  156. nodeclasspricing[classKey] = cost
  157. nodeclasscount[classKey] = 1
  158. }
  159. }
  160. c.NodeMapField = p.InstanceIDField
  161. } else if p.AssetClass == "gpu" {
  162. gpupricing[key] = &p
  163. c.GPUMapField = strings.ToLower(p.InstanceIDField)
  164. } else {
  165. log.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
  166. pricing[key] = &p
  167. c.NodeMapField = p.InstanceIDField
  168. }
  169. }
  170. if len(pricing) > 0 {
  171. c.Pricing = pricing
  172. c.NodeClassPricing = nodeclasspricing
  173. c.NodeClassCount = nodeclasscount
  174. c.PricingPV = pvpricing
  175. c.GPUClassPricing = gpupricing
  176. } else {
  177. log.DedupedWarningf(5, "No data received from csv at %s", c.CSVLocation)
  178. }
  179. return nil
  180. }
  181. type csvKey struct {
  182. Labels map[string]string
  183. ProviderID string
  184. GPULabel string
  185. GPU int64
  186. }
  187. func (k *csvKey) Features() string {
  188. instanceType, _ := util.GetInstanceType(k.Labels)
  189. region, _ := util.GetRegion(k.Labels)
  190. class := "node"
  191. return region + "," + instanceType + "," + class
  192. }
  193. func (k *csvKey) GPUCount() int {
  194. return int(k.GPU)
  195. }
  196. func (k *csvKey) GPUType() string {
  197. if val, ok := k.Labels[k.GPULabel]; ok {
  198. return val
  199. }
  200. return ""
  201. }
  202. func (k *csvKey) ID() string {
  203. return k.ProviderID
  204. }
  205. func (c *CSVProvider) NodePricing(key Key) (*Node, error) {
  206. c.DownloadPricingDataLock.RLock()
  207. defer c.DownloadPricingDataLock.RUnlock()
  208. var node *Node
  209. if p, ok := c.Pricing[key.ID()]; ok {
  210. node = &Node{
  211. Cost: p.MarketPriceHourly,
  212. PricingType: CsvExact,
  213. }
  214. }
  215. s := strings.Split(key.ID(), ",") // Try without a region to be sure
  216. if len(s) == 2 {
  217. if p, ok := c.Pricing[s[1]]; ok {
  218. node = &Node{
  219. Cost: p.MarketPriceHourly,
  220. PricingType: CsvExact,
  221. }
  222. }
  223. }
  224. classKey := key.Features() // Use node attributes to try and do a class match
  225. if cost, ok := c.NodeClassPricing[classKey]; ok {
  226. log.Infof("Unable to find provider ID `%s`, using features:`%s`", key.ID(), key.Features())
  227. node = &Node{
  228. Cost: fmt.Sprintf("%f", cost),
  229. PricingType: CsvClass,
  230. }
  231. }
  232. if node != nil {
  233. if t := key.GPUType(); t != "" {
  234. t = strings.ToLower(t)
  235. count := key.GPUCount()
  236. node.GPU = strconv.Itoa(count)
  237. hourly := 0.0
  238. if p, ok := c.GPUClassPricing[t]; ok {
  239. hourly, _ = strconv.ParseFloat(p.MarketPriceHourly, 64)
  240. }
  241. totalCost := hourly * float64(count)
  242. node.GPUCost = fmt.Sprintf("%f", totalCost)
  243. nc, _ := strconv.ParseFloat(node.Cost, 64)
  244. node.Cost = fmt.Sprintf("%f", nc+totalCost)
  245. }
  246. return node, nil
  247. } else {
  248. return nil, fmt.Errorf("Unable to find Node matching `%s`:`%s`", key.ID(), key.Features())
  249. }
  250. }
  251. func NodeValueFromMapField(m string, n *v1.Node, useRegion bool) string {
  252. mf := strings.Split(m, ".")
  253. toReturn := ""
  254. if useRegion {
  255. if region, ok := util.GetRegion(n.Labels); ok {
  256. toReturn = region + ","
  257. } else {
  258. log.Errorf("Getting region based on labels failed")
  259. }
  260. }
  261. if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
  262. for matchNum, group := range provIdRx.FindStringSubmatch(n.Spec.ProviderID) {
  263. if matchNum == 2 {
  264. return toReturn + group
  265. }
  266. }
  267. if strings.HasPrefix(n.Spec.ProviderID, "azure://") {
  268. vmOrScaleSet := strings.ToLower(strings.TrimPrefix(n.Spec.ProviderID, "azure://"))
  269. return toReturn + vmOrScaleSet
  270. }
  271. return toReturn + n.Spec.ProviderID
  272. } else if len(mf) > 1 && mf[0] == "metadata" {
  273. if mf[1] == "name" {
  274. return toReturn + n.Name
  275. } else if mf[1] == "labels" {
  276. lkey := strings.Join(mf[2:len(mf)], "")
  277. return toReturn + n.Labels[lkey]
  278. } else if mf[1] == "annotations" {
  279. akey := strings.Join(mf[2:len(mf)], "")
  280. return toReturn + n.Annotations[akey]
  281. } else {
  282. log.Errorf("Unsupported InstanceIDField %s in CSV For Node", m)
  283. return ""
  284. }
  285. } else {
  286. log.Errorf("Unsupported InstanceIDField %s in CSV For Node", m)
  287. return ""
  288. }
  289. }
  290. func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
  291. mf := strings.Split(m, ".")
  292. if len(mf) > 1 && mf[0] == "metadata" {
  293. if mf[1] == "name" {
  294. return n.Name
  295. } else if mf[1] == "labels" {
  296. lkey := strings.Join(mf[2:len(mf)], "")
  297. return n.Labels[lkey]
  298. } else if mf[1] == "annotations" {
  299. akey := strings.Join(mf[2:len(mf)], "")
  300. return n.Annotations[akey]
  301. } else {
  302. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  303. return ""
  304. }
  305. } else if len(mf) > 2 && mf[0] == "spec" {
  306. if mf[1] == "capacity" && mf[2] == "storage" {
  307. skey := n.Spec.Capacity["storage"]
  308. return skey.String()
  309. } else {
  310. log.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
  311. return ""
  312. }
  313. } else {
  314. log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
  315. return ""
  316. }
  317. }
  318. func (c *CSVProvider) GetKey(l map[string]string, n *v1.Node) Key {
  319. id := NodeValueFromMapField(c.NodeMapField, n, c.UsesRegion)
  320. var gpuCount int64
  321. gpuCount = 0
  322. if gpuc, ok := n.Status.Capacity["nvidia.com/gpu"]; ok { // TODO: support non-nvidia GPUs
  323. gpuCount = gpuc.Value()
  324. }
  325. return &csvKey{
  326. ProviderID: id,
  327. Labels: l,
  328. GPULabel: c.GPUMapField,
  329. GPU: gpuCount,
  330. }
  331. }
  332. type csvPVKey struct {
  333. Labels map[string]string
  334. ProviderID string
  335. StorageClassName string
  336. StorageClassParameters map[string]string
  337. Name string
  338. DefaultRegion string
  339. }
  340. func (key *csvPVKey) ID() string {
  341. return ""
  342. }
  343. func (key *csvPVKey) GetStorageClass() string {
  344. return key.StorageClassName
  345. }
  346. func (key *csvPVKey) Features() string {
  347. return key.ProviderID
  348. }
  349. func (c *CSVProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  350. id := PVValueFromMapField(c.PVMapField, pv)
  351. return &csvPVKey{
  352. Labels: pv.Labels,
  353. ProviderID: id,
  354. StorageClassName: pv.Spec.StorageClassName,
  355. StorageClassParameters: parameters,
  356. Name: pv.Name,
  357. DefaultRegion: defaultRegion,
  358. }
  359. }
  360. func (c *CSVProvider) PVPricing(pvk PVKey) (*PV, error) {
  361. c.DownloadPricingDataLock.RLock()
  362. defer c.DownloadPricingDataLock.RUnlock()
  363. pricing, ok := c.PricingPV[pvk.Features()]
  364. if !ok {
  365. log.Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  366. return &PV{}, nil
  367. }
  368. return &PV{
  369. Cost: pricing.MarketPriceHourly,
  370. }, nil
  371. }
  372. func (c *CSVProvider) ServiceAccountStatus() *ServiceAccountStatus {
  373. return &ServiceAccountStatus{
  374. Checks: []*ServiceAccountCheck{},
  375. }
  376. }
  377. func (*CSVProvider) ClusterManagementPricing() (string, float64, error) {
  378. return "", 0.0, nil
  379. }
  380. func (c *CSVProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
  381. return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
  382. }
  383. func (c *CSVProvider) Regions() []string {
  384. return []string{}
  385. }