pricesheetdownloader.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package azure
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/csv"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "os"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/Azure/azure-sdk-for-go/profiles/2020-09-01/commerce/mgmt/commerce"
  15. "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
  16. "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
  17. "github.com/opencost/opencost/core/pkg/log"
  18. "github.com/opencost/opencost/pkg/cloud/httputil"
  19. )
  20. type PriceSheetDownloader struct {
  21. TenantID string
  22. ClientID string
  23. ClientSecret string
  24. BillingAccount string
  25. OfferID string
  26. ConvertMeterInfo func(info commerce.MeterInfo) (map[string]*AzurePricing, error)
  27. }
  28. func (d *PriceSheetDownloader) GetPricing(ctx context.Context) (map[string]*AzurePricing, error) {
  29. log.Infof("requesting pricesheet download link")
  30. url, err := d.getDownloadURL(ctx)
  31. if err != nil {
  32. return nil, fmt.Errorf("getting download URL: %w", err)
  33. }
  34. log.Infof("downloading pricesheet from %q", url)
  35. data, err := d.saveData(ctx, url, "pricesheet")
  36. if err != nil {
  37. return nil, fmt.Errorf("saving pricesheet from %q: %w", url, err)
  38. }
  39. defer data.Close()
  40. prices, err := d.readPricesheet(ctx, data)
  41. if err != nil {
  42. return nil, fmt.Errorf("reading pricesheet: %w", err)
  43. }
  44. log.Infof("loaded %d pricings from pricesheet", len(prices))
  45. return prices, nil
  46. }
  47. func (d *PriceSheetDownloader) getDownloadURL(ctx context.Context) (string, error) {
  48. cred, err := azidentity.NewClientSecretCredential(d.TenantID, d.ClientID, d.ClientSecret, nil)
  49. if err != nil {
  50. return "", fmt.Errorf("creating credential: %w", err)
  51. }
  52. client, err := NewPriceSheetClient(d.BillingAccount, cred, nil)
  53. if err != nil {
  54. return "", fmt.Errorf("creating pricesheet client: %w", err)
  55. }
  56. poller, err := client.BeginDownloadByBillingPeriod(ctx, currentBillingPeriod())
  57. if err != nil {
  58. return "", fmt.Errorf("beginning pricesheet download: %w", err)
  59. }
  60. resp, err := poller.PollUntilDone(ctx, &runtime.PollUntilDoneOptions{
  61. Frequency: 30 * time.Second,
  62. })
  63. if err != nil {
  64. return "", fmt.Errorf("polling for pricesheet: %w", err)
  65. }
  66. return resp.Properties.DownloadURL, nil
  67. }
  68. func (d PriceSheetDownloader) saveData(ctx context.Context, url, tempName string) (io.ReadCloser, error) {
  69. // Download file from URL in response.
  70. out, err := os.CreateTemp("", tempName)
  71. if err != nil {
  72. return nil, fmt.Errorf("creating %s temp file: %w", tempName, err)
  73. }
  74. // The price sheet can be large, so the streaming client bounds connect/TLS/
  75. // response-header time but not the body read, avoiding truncation of a slow
  76. // download. Pass the caller's context so the download is cancelable.
  77. resp, err := httputil.StreamingGet(ctx, url)
  78. if err != nil {
  79. return nil, fmt.Errorf("downloading: %w", err)
  80. }
  81. defer resp.Body.Close()
  82. if resp.StatusCode != http.StatusOK {
  83. return nil, fmt.Errorf("unexpected HTTP status %d", resp.StatusCode)
  84. }
  85. if _, err := io.Copy(out, resp.Body); err != nil {
  86. return nil, fmt.Errorf("reading response: %w", err)
  87. }
  88. _, err = out.Seek(0, io.SeekStart)
  89. if err != nil {
  90. return nil, fmt.Errorf("seeking to start of file: %w", err)
  91. }
  92. return &removeOnClose{File: out}, nil
  93. }
  94. type removeOnClose struct {
  95. *os.File
  96. }
  97. func (r *removeOnClose) Close() error {
  98. err := r.File.Close()
  99. if err != nil {
  100. return err
  101. }
  102. return os.Remove(r.Name())
  103. }
  104. func (d *PriceSheetDownloader) readPricesheet(ctx context.Context, data io.Reader) (map[string]*AzurePricing, error) {
  105. // Avoid double-buffering.
  106. buf, ok := (data).(*bufio.Reader)
  107. if !ok {
  108. buf = bufio.NewReader(data)
  109. }
  110. // The CSV file starts with two lines before the header without
  111. // commas (so different numbers of fields as far as the CSV parser
  112. // is concerned). Skip them before making the CSV reader so we
  113. // still get the benefit of the row length checks after the
  114. // header.
  115. for i := 0; i < 2; i++ {
  116. _, err := buf.ReadBytes('\n')
  117. if err != nil {
  118. return nil, fmt.Errorf("skipping preamble line %d: %w", i, err)
  119. }
  120. }
  121. reader := csv.NewReader(buf)
  122. reader.ReuseRecord = true
  123. header, err := reader.Read()
  124. if err != nil {
  125. return nil, fmt.Errorf("reading header: %w", err)
  126. }
  127. if err := checkPricesheetHeader(header); err != nil {
  128. return nil, err
  129. }
  130. units := make(map[string]bool)
  131. results := make(map[string]*AzurePricing)
  132. lines := 2
  133. for {
  134. row, err := reader.Read()
  135. if err == io.EOF {
  136. break
  137. }
  138. lines++
  139. if err != nil {
  140. return nil, fmt.Errorf("reading line %d: %w", lines, err)
  141. }
  142. // Skip savings plan - we should be reporting based on the
  143. // consumption price because we don't know whether the user is
  144. // using a savings plan or over their threshold.
  145. if row[pricesheetPriceType] == "Savings Plan" || row[pricesheetOfferID] != d.OfferID {
  146. continue
  147. }
  148. // TODO: Creating a meter info for each record will cause a
  149. // lot of GC churn - is it worth reusing one meter info instead?
  150. meterInfo, err := makeMeterInfo(row)
  151. if err != nil {
  152. log.Warnf("making meter info (line %d): %v", lines, err)
  153. continue
  154. }
  155. pricings, err := d.ConvertMeterInfo(meterInfo)
  156. if err != nil {
  157. log.Warnf("converting meter to pricings (line %d): %v", lines, err)
  158. continue
  159. }
  160. if pricings != nil {
  161. units[*meterInfo.Unit] = true
  162. }
  163. for key, pricing := range pricings {
  164. results[key] = pricing
  165. }
  166. }
  167. if len(results) == 0 {
  168. return nil, fmt.Errorf("no matching pricing from price sheet")
  169. }
  170. // Keep track of units seen so we can detect if there are any that
  171. // need handling.
  172. allUnits := make([]string, 0, len(units))
  173. for unit := range units {
  174. allUnits = append(allUnits, unit)
  175. }
  176. sort.Strings(allUnits)
  177. log.Infof("all units in pricesheet: %s", strings.Join(allUnits, ", "))
  178. return results, nil
  179. }
  180. func checkPricesheetHeader(header []string) error {
  181. if len(header) < len(pricesheetCols) {
  182. return fmt.Errorf("too few header columns: got %d, expected %d", len(header), len(pricesheetCols))
  183. }
  184. for col, name := range pricesheetCols {
  185. if !strings.EqualFold(header[col], name) {
  186. return fmt.Errorf("unexpected header at col %d %q, expected %q", col, header[col], name)
  187. }
  188. }
  189. return nil
  190. }
  191. func makeMeterInfo(row []string) (commerce.MeterInfo, error) {
  192. price, err := strconv.ParseFloat(row[pricesheetUnitPrice], 64)
  193. if err != nil {
  194. return commerce.MeterInfo{}, fmt.Errorf("parsing unit price: %w", err)
  195. }
  196. newPrice, unit := normalisePrice(price, row[pricesheetUnit])
  197. return commerce.MeterInfo{
  198. MeterName: ptr(row[pricesheetMeterName]),
  199. MeterCategory: ptr(row[pricesheetMeterCategory]),
  200. MeterSubCategory: ptr(row[pricesheetMeterSubCategory]),
  201. Unit: &unit,
  202. MeterRegion: ptr(row[pricesheetMeterRegion]),
  203. MeterRates: map[string]*float64{"0": &newPrice},
  204. }, nil
  205. }
  206. var pricesheetCols = []string{
  207. "Meter ID",
  208. "Meter name",
  209. "Meter category",
  210. "Meter sub-category",
  211. "Meter region",
  212. "Unit",
  213. "Unit of measure",
  214. "Part number",
  215. "Unit price",
  216. "Currency code",
  217. "Included quantity",
  218. "Offer Id",
  219. "Term",
  220. "Price type",
  221. }
  222. const (
  223. pricesheetMeterID = 0
  224. pricesheetMeterName = 1
  225. pricesheetMeterCategory = 2
  226. pricesheetMeterSubCategory = 3
  227. pricesheetMeterRegion = 4
  228. pricesheetUnit = 5
  229. pricesheetUnitPrice = 8
  230. pricesheetCurrencyCode = 9
  231. pricesheetOfferID = 11
  232. pricesheetPriceType = 13
  233. )
  234. func currentBillingPeriod() string {
  235. return time.Now().Format("200601")
  236. }
  237. func ptr[T any](v T) *T {
  238. return &v
  239. }
  240. // conversions lists all the units seen from the price sheet for
  241. // prices we're interested in with factors to the corresponding units
  242. // in the rate card.
  243. var conversions = map[string]struct {
  244. divisor float64
  245. unit string
  246. }{
  247. "1 /Month": {divisor: 1, unit: "1 /Month"},
  248. "1 Hour": {divisor: 1, unit: "1 Hour"},
  249. "1 PiB/Hour": {divisor: 1_000_000, unit: "1 GiB/Hour"},
  250. "10 /Month": {divisor: 10, unit: "1 /Month"},
  251. "10 Hours": {divisor: 10, unit: "1 Hour"},
  252. "100 /Month": {divisor: 100, unit: "1 /Month"},
  253. "100 GB/Month": {divisor: 100, unit: "1 GB/Month"},
  254. "100 Hours": {divisor: 100, unit: "1 Hour"},
  255. "100 TiB/Hour": {divisor: 100_000, unit: "1 GiB/Hour"},
  256. "1000 Hours": {divisor: 1000, unit: "1 Hour"},
  257. "10000 Hours": {divisor: 10_000, unit: "1 Hour"},
  258. "100000 /Hour": {divisor: 100_000, unit: "1 /Hour"},
  259. "1000000 /Hour": {divisor: 1_000_000, unit: "1 /Hour"},
  260. "10000000 /Hour": {divisor: 10_000_000, unit: "1 /Hour"},
  261. }
  262. func normalisePrice(price float64, unit string) (float64, string) {
  263. if conv, ok := conversions[unit]; ok {
  264. return price / conv.divisor, conv.unit
  265. }
  266. return price, unit
  267. }