storagebillingparser.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. package azure
  2. import (
  3. "context"
  4. "encoding/csv"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path/filepath"
  9. "strings"
  10. "time"
  11. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
  12. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
  13. "github.com/opencost/opencost/core/pkg/log"
  14. "github.com/opencost/opencost/pkg/cloud"
  15. "github.com/opencost/opencost/pkg/env"
  16. )
  17. // AzureStorageBillingParser accesses billing data stored in CSV files in Azure Storage
  18. type AzureStorageBillingParser struct {
  19. StorageConnection
  20. }
  21. func (asbp *AzureStorageBillingParser) Equals(config cloud.Config) bool {
  22. thatConfig, ok := config.(*AzureStorageBillingParser)
  23. if !ok {
  24. return false
  25. }
  26. return asbp.StorageConnection.Equals(&thatConfig.StorageConnection)
  27. }
  28. type AzureBillingResultFunc func(*BillingRowValues) error
  29. func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, resultFn AzureBillingResultFunc) error {
  30. err := asbp.Validate()
  31. if err != nil {
  32. asbp.ConnectionStatus = cloud.InvalidConfiguration
  33. return err
  34. }
  35. serviceURL := fmt.Sprintf(asbp.StorageConnection.getBlobURLTemplate(), asbp.Account, "")
  36. client, err := asbp.Authorizer.GetBlobClient(serviceURL)
  37. if err != nil {
  38. asbp.ConnectionStatus = cloud.FailedConnection
  39. return err
  40. }
  41. ctx := context.Background()
  42. // Example blobNames: [ export/myExport/20240101-20240131/myExport_758a42af-0731-4edb-b498-1e523bb40f12.csv ]
  43. blobNames, err := asbp.getMostRecentBlobs(start, end, client, ctx)
  44. if err != nil {
  45. asbp.ConnectionStatus = cloud.FailedConnection
  46. return err
  47. }
  48. if len(blobNames) == 0 && asbp.ConnectionStatus != cloud.SuccessfulConnection {
  49. asbp.ConnectionStatus = cloud.MissingData
  50. return nil
  51. }
  52. for _, blobName := range blobNames {
  53. localPath := filepath.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), "db", "cloudcost")
  54. localFilePath := filepath.Join(localPath, filepath.Base(blobName))
  55. if _, err := asbp.deleteFilesOlderThan7d(localPath); err != nil {
  56. log.Warnf("CloudCost: Azure: ParseBillingData: failed to remove the following stale files: %v", err)
  57. }
  58. err := asbp.DownloadBlobToFile(localFilePath, blobName, client, ctx)
  59. if err != nil {
  60. asbp.ConnectionStatus = cloud.FailedConnection
  61. return err
  62. }
  63. fp, err := os.Open(localFilePath)
  64. if err != nil {
  65. asbp.ConnectionStatus = cloud.FailedConnection
  66. return err
  67. }
  68. defer fp.Close()
  69. err = asbp.parseCSV(start, end, csv.NewReader(fp), resultFn)
  70. if err != nil {
  71. asbp.ConnectionStatus = cloud.ParseError
  72. return err
  73. }
  74. }
  75. asbp.ConnectionStatus = cloud.SuccessfulConnection
  76. return nil
  77. }
  78. func (asbp *AzureStorageBillingParser) parseCSV(start, end time.Time, reader *csv.Reader, resultFn AzureBillingResultFunc) error {
  79. headers, err := reader.Read()
  80. if err != nil {
  81. return err
  82. }
  83. abp, err := NewBillingParseSchema(headers)
  84. if err != nil {
  85. return err
  86. }
  87. for {
  88. var record, err = reader.Read()
  89. if err == io.EOF {
  90. break
  91. }
  92. if err != nil {
  93. return err
  94. }
  95. abv := abp.ParseRow(start, end, record)
  96. if abv == nil {
  97. continue
  98. }
  99. err = resultFn(abv)
  100. if err != nil {
  101. return err
  102. }
  103. }
  104. return nil
  105. }
  106. func (asbp *AzureStorageBillingParser) getMostRecentBlobs(start, end time.Time, client *azblob.Client, ctx context.Context) ([]string, error) {
  107. log.Infof("Azure Storage: retrieving most recent reports from: %v - %v", start, end)
  108. // Get list of month substrings for months contained in the start to end range
  109. monthStrs, err := asbp.getMonthStrings(start, end)
  110. if err != nil {
  111. return nil, err
  112. }
  113. mostResentBlobs := make(map[string]container.BlobItem)
  114. pager := client.NewListBlobsFlatPager(asbp.Container, &azblob.ListBlobsFlatOptions{
  115. Include: container.ListBlobsInclude{Deleted: false, Versions: false},
  116. })
  117. for pager.More() {
  118. resp, err := pager.NextPage(ctx)
  119. if err != nil {
  120. return nil, err
  121. }
  122. // Using the list of months strings find the most resent blob for each month in the range
  123. for _, blobInfo := range resp.Segment.BlobItems {
  124. if blobInfo.Name == nil {
  125. continue
  126. }
  127. // If Container Path configuration exists, check if it is in the blobs name
  128. if asbp.Path != "" && !strings.Contains(*blobInfo.Name, asbp.Path) {
  129. continue
  130. }
  131. for _, month := range monthStrs {
  132. if strings.Contains(*blobInfo.Name, month) {
  133. // check if blob is the newest seen for this month
  134. if prevBlob, ok := mostResentBlobs[month]; ok {
  135. if prevBlob.Properties.CreationTime.After(*blobInfo.Properties.CreationTime) {
  136. continue
  137. }
  138. }
  139. mostResentBlobs[month] = *blobInfo
  140. }
  141. }
  142. }
  143. }
  144. // convert blob names into blob urls and move from map into ordered list of blob names
  145. var blobNames []string
  146. for _, month := range monthStrs {
  147. if blob, ok := mostResentBlobs[month]; ok {
  148. blobNames = append(blobNames, *blob.Name)
  149. }
  150. }
  151. return blobNames, nil
  152. }
  153. func (asbp *AzureStorageBillingParser) getMonthStrings(start, end time.Time) ([]string, error) {
  154. if start.After(end) {
  155. return []string{}, fmt.Errorf("start date must be before end date")
  156. }
  157. if end.After(time.Now()) {
  158. end = time.Now()
  159. }
  160. var monthStrs []string
  161. monthStr := asbp.timeToMonthString(start)
  162. endStr := asbp.timeToMonthString(end)
  163. monthStrs = append(monthStrs, monthStr)
  164. currMonth := start.AddDate(0, 0, -start.Day()+1)
  165. for monthStr != endStr {
  166. currMonth = currMonth.AddDate(0, 1, 0)
  167. monthStr = asbp.timeToMonthString(currMonth)
  168. monthStrs = append(monthStrs, monthStr)
  169. }
  170. return monthStrs, nil
  171. }
  172. func (asbp *AzureStorageBillingParser) timeToMonthString(input time.Time) string {
  173. format := "20060102"
  174. startOfMonth := input.AddDate(0, 0, -input.Day()+1)
  175. endOfMonth := input.AddDate(0, 1, -input.Day())
  176. return startOfMonth.Format(format) + "-" + endOfMonth.Format(format)
  177. }
  178. // deleteFilesOlderThan7d recursively walks the directory specified and deletes
  179. // files which have not been modified in the last 7 days. Returns a list of
  180. // files deleted.
  181. func (asbp *AzureStorageBillingParser) deleteFilesOlderThan7d(localPath string) ([]string, error) {
  182. duration := 7 * 24 * time.Hour
  183. cleaned := []string{}
  184. errs := []string{}
  185. if _, err := os.Stat(localPath); err != nil {
  186. return cleaned, nil // localPath does not exist
  187. }
  188. filepath.Walk(localPath, func(path string, info os.FileInfo, err error) error {
  189. if err != nil {
  190. errs = append(errs, err.Error())
  191. return err
  192. }
  193. if time.Since(info.ModTime()) > duration {
  194. err := os.Remove(path)
  195. if err != nil {
  196. errs = append(errs, err.Error())
  197. }
  198. cleaned = append(cleaned, path)
  199. }
  200. return nil
  201. })
  202. if len(errs) == 0 {
  203. return cleaned, nil
  204. } else {
  205. return cleaned, fmt.Errorf("deleteFilesOlderThan7d: %v", errs)
  206. }
  207. }