storagebillingparser.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package azure
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/csv"
  6. "fmt"
  7. "io"
  8. "strings"
  9. "time"
  10. "github.com/Azure/azure-storage-blob-go/azblob"
  11. "github.com/opencost/opencost/pkg/cloud"
  12. cloudconfig "github.com/opencost/opencost/pkg/cloud/config"
  13. "github.com/opencost/opencost/pkg/log"
  14. )
  15. // AzureStorageBillingParser accesses billing data stored in CSV files in Azure Storage
  16. type AzureStorageBillingParser struct {
  17. StorageConnection
  18. }
  19. func (asbp *AzureStorageBillingParser) Equals(config cloudconfig.Config) bool {
  20. thatConfig, ok := config.(*AzureStorageBillingParser)
  21. if !ok {
  22. return false
  23. }
  24. return asbp.StorageConnection.Equals(&thatConfig.StorageConnection)
  25. }
  26. type AzureBillingResultFunc func(*BillingRowValues) error
  27. func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, resultFn AzureBillingResultFunc) (cloud.ConnectionStatus, error) {
  28. err := asbp.Validate()
  29. if err != nil {
  30. return cloud.InvalidConfiguration, err
  31. }
  32. containerURL, err := asbp.getContainer()
  33. if err != nil {
  34. return cloud.FailedConnection, err
  35. }
  36. ctx := context.Background()
  37. blobNames, err := asbp.getMostRecentBlobs(start, end, containerURL, ctx)
  38. if err != nil {
  39. return cloud.FailedConnection, err
  40. }
  41. for _, blobName := range blobNames {
  42. blobBytes, err2 := asbp.DownloadBlob(blobName, containerURL, ctx)
  43. if err2 != nil {
  44. return cloud.FailedConnection, err2
  45. }
  46. err2 = asbp.parseCSV(start, end, csv.NewReader(bytes.NewReader(blobBytes)), resultFn)
  47. if err2 != nil {
  48. return cloud.ParseError, err2
  49. }
  50. }
  51. return cloud.SuccessfulConnection, nil
  52. }
  53. func (asbp *AzureStorageBillingParser) parseCSV(start, end time.Time, reader *csv.Reader, resultFn AzureBillingResultFunc) error {
  54. headers, err := reader.Read()
  55. if err != nil {
  56. return err
  57. }
  58. abp, err := NewBillingParseSchema(headers)
  59. if err != nil {
  60. return err
  61. }
  62. for {
  63. var record, err = reader.Read()
  64. if err == io.EOF {
  65. break
  66. }
  67. if err != nil {
  68. return err
  69. }
  70. abv := abp.ParseRow(start, end, record)
  71. if abv == nil {
  72. continue
  73. }
  74. err = resultFn(abv)
  75. if err != nil {
  76. return err
  77. }
  78. }
  79. return nil
  80. }
  81. func (asbp *AzureStorageBillingParser) getMostRecentBlobs(start, end time.Time, containerURL *azblob.ContainerURL, ctx context.Context) ([]string, error) {
  82. log.Infof("Azure Storage: retrieving most recent reports from: %v - %v", start, end)
  83. // Get list of month substrings for months contained in the start to end range
  84. monthStrs, err := asbp.getMonthStrings(start, end)
  85. if err != nil {
  86. return nil, err
  87. }
  88. mostResentBlobs := make(map[string]azblob.BlobItemInternal)
  89. for marker := (azblob.Marker{}); marker.NotDone(); {
  90. // Get a result segment starting with the blob indicated by the current Marker.
  91. listBlob, err := containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{})
  92. if err != nil {
  93. return nil, err
  94. }
  95. // ListBlobs returns the start of the next segment; you MUST use this to get
  96. // the next segment (after processing the current result segment).
  97. marker = listBlob.NextMarker
  98. // Using the list of months strings find the most resent blob for each month in the range
  99. for _, blobInfo := range listBlob.Segment.BlobItems {
  100. for _, month := range monthStrs {
  101. if strings.Contains(blobInfo.Name, month) {
  102. // If Container Path configuration exists, check if it is in the blobs name
  103. if asbp.Path != "" && !strings.Contains(blobInfo.Name, asbp.Path) {
  104. continue
  105. }
  106. if prevBlob, ok := mostResentBlobs[month]; ok {
  107. if prevBlob.Properties.CreationTime.After(*blobInfo.Properties.CreationTime) {
  108. continue
  109. }
  110. }
  111. mostResentBlobs[month] = blobInfo
  112. }
  113. }
  114. }
  115. }
  116. // convert blob names into blob urls and move from map into ordered list of blob names
  117. var blobNames []string
  118. for _, month := range monthStrs {
  119. if blob, ok := mostResentBlobs[month]; ok {
  120. blobNames = append(blobNames, blob.Name)
  121. }
  122. }
  123. return blobNames, nil
  124. }
  125. func (asbp *AzureStorageBillingParser) getMonthStrings(start, end time.Time) ([]string, error) {
  126. if start.After(end) {
  127. return []string{}, fmt.Errorf("start date must be before end date")
  128. }
  129. if end.After(time.Now()) {
  130. end = time.Now()
  131. }
  132. var monthStrs []string
  133. monthStr := asbp.timeToMonthString(start)
  134. endStr := asbp.timeToMonthString(end)
  135. monthStrs = append(monthStrs, monthStr)
  136. currMonth := start.AddDate(0, 0, -start.Day()+1)
  137. for monthStr != endStr {
  138. currMonth = currMonth.AddDate(0, 1, 0)
  139. monthStr = asbp.timeToMonthString(currMonth)
  140. monthStrs = append(monthStrs, monthStr)
  141. }
  142. return monthStrs, nil
  143. }
  144. func (asbp *AzureStorageBillingParser) timeToMonthString(input time.Time) string {
  145. format := "20060102"
  146. startOfMonth := input.AddDate(0, 0, -input.Day()+1)
  147. endOfMonth := input.AddDate(0, 1, -input.Day())
  148. return startOfMonth.Format(format) + "-" + endOfMonth.Format(format)
  149. }