| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 |
- package cloud
- import (
- "encoding/csv"
- "fmt"
- "io"
- "os"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/opencost/opencost/pkg/env"
- "github.com/opencost/opencost/pkg/util"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/aws/session"
- "github.com/aws/aws-sdk-go/service/s3"
- "github.com/opencost/opencost/pkg/log"
- v1 "k8s.io/api/core/v1"
- "github.com/jszwec/csvutil"
- )
- const refreshMinutes = 60
- type CSVProvider struct {
- *CustomProvider
- CSVLocation string
- Pricing map[string]*price
- NodeClassPricing map[string]float64
- NodeClassCount map[string]float64
- NodeMapField string
- PricingPV map[string]*price
- PVMapField string
- UsesRegion bool
- DownloadPricingDataLock sync.RWMutex
- }
- type price struct {
- EndTimestamp string `csv:"EndTimestamp"`
- InstanceID string `csv:"InstanceID"`
- Region string `csv:"Region"`
- AssetClass string `csv:"AssetClass"`
- InstanceIDField string `csv:"InstanceIDField"`
- InstanceType string `csv:"InstanceType"`
- MarketPriceHourly string `csv:"MarketPriceHourly"`
- Version string `csv:"Version"`
- }
- func GetCsv(location string) (io.Reader, error) {
- return os.Open(location)
- }
- func (c *CSVProvider) DownloadPricingData() error {
- c.DownloadPricingDataLock.Lock()
- defer time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
- defer c.DownloadPricingDataLock.Unlock()
- pricing := make(map[string]*price)
- nodeclasspricing := make(map[string]float64)
- nodeclasscount := make(map[string]float64)
- pvpricing := make(map[string]*price)
- header, err := csvutil.Header(price{}, "csv")
- if err != nil {
- return err
- }
- fieldsPerRecord := len(header)
- var csvr io.Reader
- var csverr error
- if strings.HasPrefix(c.CSVLocation, "s3://") {
- region := env.GetCSVRegion()
- conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
- endpoint := env.GetCSVEndpoint()
- if endpoint != "" {
- conf = conf.WithEndpoint(endpoint)
- }
- s3Client := s3.New(session.New(conf))
- bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
- if len(bucketAndKey) == 2 {
- out, err := s3Client.GetObject(&s3.GetObjectInput{
- Bucket: aws.String(bucketAndKey[0]),
- Key: aws.String(bucketAndKey[1]),
- })
- csverr = err
- csvr = out.Body
- } else {
- c.Pricing = pricing
- c.NodeClassPricing = nodeclasspricing
- c.NodeClassCount = nodeclasscount
- c.PricingPV = pvpricing
- return fmt.Errorf("Invalid s3 URI: %s", c.CSVLocation)
- }
- } else {
- csvr, csverr = GetCsv(c.CSVLocation)
- }
- if csverr != nil {
- log.Infof("Error reading csv at %s: %s", c.CSVLocation, csverr)
- c.Pricing = pricing
- c.NodeClassPricing = nodeclasspricing
- c.NodeClassCount = nodeclasscount
- c.PricingPV = pvpricing
- return nil
- }
- csvReader := csv.NewReader(csvr)
- csvReader.Comma = ','
- csvReader.FieldsPerRecord = fieldsPerRecord
- dec, err := csvutil.NewDecoder(csvReader, header...)
- if err != nil {
- c.Pricing = pricing
- c.NodeClassPricing = nodeclasspricing
- c.NodeClassCount = nodeclasscount
- c.PricingPV = pvpricing
- return err
- }
- for {
- p := price{}
- err := dec.Decode(&p)
- csvParseErr, isCsvParseErr := err.(*csv.ParseError)
- if err == io.EOF {
- break
- } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
- rec := dec.Record()
- if len(rec) != 1 {
- log.Infof("Expected %d price info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
- continue
- }
- if strings.Index(rec[0], "#") == 0 {
- continue
- } else {
- log.Infof("skipping non-CSV line: %s", rec)
- continue
- }
- } else if err != nil {
- log.Infof("Error during spot info decode: %+v", err)
- continue
- }
- log.Infof("Found price info %+v", p)
- key := strings.ToLower(p.InstanceID)
- if p.Region != "" { // strip the casing from region and add to key.
- key = fmt.Sprintf("%s,%s", strings.ToLower(p.Region), strings.ToLower(p.InstanceID))
- c.UsesRegion = true
- }
- if p.AssetClass == "pv" {
- pvpricing[key] = &p
- c.PVMapField = p.InstanceIDField
- } else if p.AssetClass == "node" {
- pricing[key] = &p
- classKey := p.Region + "," + p.InstanceType + "," + p.AssetClass
- cost, err := strconv.ParseFloat(p.MarketPriceHourly, 64)
- if err != nil {
- } else {
- if _, ok := nodeclasspricing[classKey]; ok {
- oldPrice := nodeclasspricing[classKey]
- oldCount := nodeclasscount[classKey]
- newPrice := ((oldPrice * oldCount) + cost) / (oldCount + 1.0)
- nodeclasscount[classKey] = newPrice
- nodeclasscount[classKey]++
- } else {
- nodeclasspricing[classKey] = cost
- nodeclasscount[classKey] = 1
- }
- }
- c.NodeMapField = p.InstanceIDField
- } else {
- log.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
- pricing[key] = &p
- c.NodeMapField = p.InstanceIDField
- }
- }
- if len(pricing) > 0 {
- c.Pricing = pricing
- c.NodeClassPricing = nodeclasspricing
- c.NodeClassCount = nodeclasscount
- c.PricingPV = pvpricing
- } else {
- log.DedupedWarningf(5, "No data received from csv at %s", c.CSVLocation)
- }
- return nil
- }
- type csvKey struct {
- Labels map[string]string
- ProviderID string
- }
- func (k *csvKey) Features() string {
- instanceType, _ := util.GetInstanceType(k.Labels)
- region, _ := util.GetRegion(k.Labels)
- class := "node"
- return region + "," + instanceType + "," + class
- }
- func (k *csvKey) GPUType() string {
- return ""
- }
- func (k *csvKey) ID() string {
- return k.ProviderID
- }
- func (c *CSVProvider) NodePricing(key Key) (*Node, error) {
- c.DownloadPricingDataLock.RLock()
- defer c.DownloadPricingDataLock.RUnlock()
- if p, ok := c.Pricing[key.ID()]; ok {
- return &Node{
- Cost: p.MarketPriceHourly,
- PricingType: CsvExact,
- }, nil
- }
- s := strings.Split(key.ID(), ",") // Try without a region to be sure
- if len(s) == 2 {
- if p, ok := c.Pricing[s[1]]; ok {
- return &Node{
- Cost: p.MarketPriceHourly,
- PricingType: CsvExact,
- }, nil
- }
- }
- classKey := key.Features() // Use node attributes to try and do a class match
- if cost, ok := c.NodeClassPricing[classKey]; ok {
- log.Infof("Unable to find provider ID `%s`, using features:`%s`", key.ID(), key.Features())
- return &Node{
- Cost: fmt.Sprintf("%f", cost),
- PricingType: CsvClass,
- }, nil
- }
- return nil, fmt.Errorf("Unable to find Node matching `%s`:`%s`", key.ID(), key.Features())
- }
- func NodeValueFromMapField(m string, n *v1.Node, useRegion bool) string {
- mf := strings.Split(m, ".")
- toReturn := ""
- if useRegion {
- if region, ok := util.GetRegion(n.Labels); ok {
- toReturn = region + ","
- } else {
- log.Errorf("Getting region based on labels failed")
- }
- }
- if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
- for matchNum, group := range provIdRx.FindStringSubmatch(n.Spec.ProviderID) {
- if matchNum == 2 {
- return toReturn + group
- }
- }
- if strings.HasPrefix(n.Spec.ProviderID, "azure://") {
- vmOrScaleSet := strings.ToLower(strings.TrimPrefix(n.Spec.ProviderID, "azure://"))
- return toReturn + vmOrScaleSet
- }
- return toReturn + n.Spec.ProviderID
- } else if len(mf) > 1 && mf[0] == "metadata" {
- if mf[1] == "name" {
- return toReturn + n.Name
- } else if mf[1] == "labels" {
- lkey := strings.Join(mf[2:len(mf)], "")
- return toReturn + n.Labels[lkey]
- } else if mf[1] == "annotations" {
- akey := strings.Join(mf[2:len(mf)], "")
- return toReturn + n.Annotations[akey]
- } else {
- log.Errorf("Unsupported InstanceIDField %s in CSV For Node", m)
- return ""
- }
- } else {
- log.Errorf("Unsupported InstanceIDField %s in CSV For Node", m)
- return ""
- }
- }
- func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
- mf := strings.Split(m, ".")
- if len(mf) > 1 && mf[0] == "metadata" {
- if mf[1] == "name" {
- return n.Name
- } else if mf[1] == "labels" {
- lkey := strings.Join(mf[2:len(mf)], "")
- return n.Labels[lkey]
- } else if mf[1] == "annotations" {
- akey := strings.Join(mf[2:len(mf)], "")
- return n.Annotations[akey]
- } else {
- log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
- return ""
- }
- } else if len(mf) > 2 && mf[0] == "spec" {
- if mf[1] == "capacity" && mf[2] == "storage" {
- skey := n.Spec.Capacity["storage"]
- return skey.String()
- } else {
- log.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
- return ""
- }
- } else {
- log.Errorf("Unsupported InstanceIDField %s in CSV For PV", m)
- return ""
- }
- }
- func (c *CSVProvider) GetKey(l map[string]string, n *v1.Node) Key {
- id := NodeValueFromMapField(c.NodeMapField, n, c.UsesRegion)
- return &csvKey{
- ProviderID: id,
- Labels: l,
- }
- }
- type csvPVKey struct {
- Labels map[string]string
- ProviderID string
- StorageClassName string
- StorageClassParameters map[string]string
- Name string
- DefaultRegion string
- }
- func (key *csvPVKey) ID() string {
- return ""
- }
- func (key *csvPVKey) GetStorageClass() string {
- return key.StorageClassName
- }
- func (key *csvPVKey) Features() string {
- return key.ProviderID
- }
- func (c *CSVProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
- id := PVValueFromMapField(c.PVMapField, pv)
- return &csvPVKey{
- Labels: pv.Labels,
- ProviderID: id,
- StorageClassName: pv.Spec.StorageClassName,
- StorageClassParameters: parameters,
- Name: pv.Name,
- DefaultRegion: defaultRegion,
- }
- }
- func (c *CSVProvider) PVPricing(pvk PVKey) (*PV, error) {
- c.DownloadPricingDataLock.RLock()
- defer c.DownloadPricingDataLock.RUnlock()
- pricing, ok := c.PricingPV[pvk.Features()]
- if !ok {
- log.Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
- return &PV{}, nil
- }
- return &PV{
- Cost: pricing.MarketPriceHourly,
- }, nil
- }
- func (c *CSVProvider) ServiceAccountStatus() *ServiceAccountStatus {
- return &ServiceAccountStatus{
- Checks: []*ServiceAccountCheck{},
- }
- }
- func (*CSVProvider) ClusterManagementPricing() (string, float64, error) {
- return "", 0.0, nil
- }
- func (c *CSVProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
- return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
- }
- func (c *CSVProvider) Regions() []string {
- return []string{}
- }
|