storagebillingparser.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package azure
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/csv"
  6. "fmt"
  7. "io"
  8. "os"
  9. "strings"
  10. "time"
  11. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
  12. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
  13. "github.com/opencost/opencost/pkg/cloud"
  14. "github.com/opencost/opencost/pkg/env"
  15. "github.com/opencost/opencost/pkg/log"
  16. )
  17. // AzureStorageBillingParser accesses billing data stored in CSV files in Azure Storage
  18. type AzureStorageBillingParser struct {
  19. StorageConnection
  20. }
  21. func (asbp *AzureStorageBillingParser) Equals(config cloud.Config) bool {
  22. thatConfig, ok := config.(*AzureStorageBillingParser)
  23. if !ok {
  24. return false
  25. }
  26. return asbp.StorageConnection.Equals(&thatConfig.StorageConnection)
  27. }
  28. type AzureBillingResultFunc func(*BillingRowValues) error
  29. func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, resultFn AzureBillingResultFunc) error {
  30. err := asbp.Validate()
  31. if err != nil {
  32. asbp.ConnectionStatus = cloud.InvalidConfiguration
  33. return err
  34. }
  35. serviceURL := fmt.Sprintf(asbp.StorageConnection.getBlobURLTemplate(), asbp.Account, "")
  36. client, err := asbp.Authorizer.GetBlobClient(serviceURL)
  37. if err != nil {
  38. asbp.ConnectionStatus = cloud.FailedConnection
  39. return err
  40. }
  41. ctx := context.Background()
  42. blobNames, err := asbp.getMostRecentBlobs(start, end, client, ctx)
  43. if err != nil {
  44. asbp.ConnectionStatus = cloud.FailedConnection
  45. return err
  46. }
  47. if len(blobNames) == 0 && asbp.ConnectionStatus != cloud.SuccessfulConnection {
  48. asbp.ConnectionStatus = cloud.MissingData
  49. return nil
  50. }
  51. for _, blobName := range blobNames {
  52. if env.IsAzureParseBillingPaginated() {
  53. localFilepath := "/var/configs/db/cloudCost/azurebilling.csv"
  54. err := asbp.DownloadBlobToFile(localFilepath, blobName, client, ctx)
  55. if err != nil {
  56. asbp.ConnectionStatus = cloud.FailedConnection
  57. return err
  58. }
  59. file, err := os.Open(localFilepath)
  60. if err != nil {
  61. asbp.ConnectionStatus = cloud.FailedConnection
  62. return err
  63. }
  64. defer file.Close()
  65. err2 = asbp.parseCSV(start, end, csv.NewReader(bytes.NewReader(blobBytes)), resultFn)
  66. if err2 != nil {
  67. asbp.ConnectionStatus = cloud.ParseError
  68. return err2
  69. }
  70. } else {
  71. blobBytes, err2 := asbp.DownloadBlob(blobName, client, ctx)
  72. if err2 != nil {
  73. asbp.ConnectionStatus = cloud.FailedConnection
  74. return err2
  75. }
  76. err2 = asbp.parseCSV(start, end, csv.NewReader(bytes.NewReader(blobBytes)), resultFn)
  77. if err2 != nil {
  78. asbp.ConnectionStatus = cloud.ParseError
  79. return err2
  80. }
  81. }
  82. }
  83. asbp.ConnectionStatus = cloud.SuccessfulConnection
  84. return nil
  85. }
  86. func (asbp *AzureStorageBillingParser) parseCSV(start, end time.Time, reader *csv.Reader, resultFn AzureBillingResultFunc) error {
  87. headers, err := reader.Read()
  88. if err != nil {
  89. return err
  90. }
  91. abp, err := NewBillingParseSchema(headers)
  92. if err != nil {
  93. return err
  94. }
  95. for {
  96. var record, err = reader.Read()
  97. if err == io.EOF {
  98. break
  99. }
  100. if err != nil {
  101. return err
  102. }
  103. abv := abp.ParseRow(start, end, record)
  104. if abv == nil {
  105. continue
  106. }
  107. err = resultFn(abv)
  108. if err != nil {
  109. return err
  110. }
  111. }
  112. return nil
  113. }
  114. func (asbp *AzureStorageBillingParser) getMostRecentBlobs(start, end time.Time, client *azblob.Client, ctx context.Context) ([]string, error) {
  115. log.Infof("Azure Storage: retrieving most recent reports from: %v - %v", start, end)
  116. // Get list of month substrings for months contained in the start to end range
  117. monthStrs, err := asbp.getMonthStrings(start, end)
  118. if err != nil {
  119. return nil, err
  120. }
  121. mostResentBlobs := make(map[string]container.BlobItem)
  122. pager := client.NewListBlobsFlatPager(asbp.Container, &azblob.ListBlobsFlatOptions{
  123. Include: container.ListBlobsInclude{Deleted: false, Versions: false},
  124. })
  125. for pager.More() {
  126. resp, err := pager.NextPage(ctx)
  127. if err != nil {
  128. return nil, err
  129. }
  130. // Using the list of months strings find the most resent blob for each month in the range
  131. for _, blobInfo := range resp.Segment.BlobItems {
  132. if blobInfo.Name == nil {
  133. continue
  134. }
  135. // If Container Path configuration exists, check if it is in the blobs name
  136. if asbp.Path != "" && !strings.Contains(*blobInfo.Name, asbp.Path) {
  137. continue
  138. }
  139. for _, month := range monthStrs {
  140. if strings.Contains(*blobInfo.Name, month) {
  141. // check if blob is the newest seen for this month
  142. if prevBlob, ok := mostResentBlobs[month]; ok {
  143. if prevBlob.Properties.CreationTime.After(*blobInfo.Properties.CreationTime) {
  144. continue
  145. }
  146. }
  147. mostResentBlobs[month] = *blobInfo
  148. }
  149. }
  150. }
  151. }
  152. // convert blob names into blob urls and move from map into ordered list of blob names
  153. var blobNames []string
  154. for _, month := range monthStrs {
  155. if blob, ok := mostResentBlobs[month]; ok {
  156. blobNames = append(blobNames, *blob.Name)
  157. }
  158. }
  159. return blobNames, nil
  160. }
  161. func (asbp *AzureStorageBillingParser) getMonthStrings(start, end time.Time) ([]string, error) {
  162. if start.After(end) {
  163. return []string{}, fmt.Errorf("start date must be before end date")
  164. }
  165. if end.After(time.Now()) {
  166. end = time.Now()
  167. }
  168. var monthStrs []string
  169. monthStr := asbp.timeToMonthString(start)
  170. endStr := asbp.timeToMonthString(end)
  171. monthStrs = append(monthStrs, monthStr)
  172. currMonth := start.AddDate(0, 0, -start.Day()+1)
  173. for monthStr != endStr {
  174. currMonth = currMonth.AddDate(0, 1, 0)
  175. monthStr = asbp.timeToMonthString(currMonth)
  176. monthStrs = append(monthStrs, monthStr)
  177. }
  178. return monthStrs, nil
  179. }
  180. func (asbp *AzureStorageBillingParser) timeToMonthString(input time.Time) string {
  181. format := "20060102"
  182. startOfMonth := input.AddDate(0, 0, -input.Day()+1)
  183. endOfMonth := input.AddDate(0, 1, -input.Day())
  184. return startOfMonth.Format(format) + "-" + endOfMonth.Format(format)
  185. }