storagebillingparser.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package azure
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/csv"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "strings"
  11. "time"
  12. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
  13. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
  14. "github.com/opencost/opencost/core/pkg/log"
  15. "github.com/opencost/opencost/pkg/cloud"
  16. "github.com/opencost/opencost/pkg/env"
  17. )
  18. // AzureStorageBillingParser accesses billing data stored in CSV files in Azure Storage
  19. type AzureStorageBillingParser struct {
  20. StorageConnection
  21. }
  22. func (asbp *AzureStorageBillingParser) Equals(config cloud.Config) bool {
  23. thatConfig, ok := config.(*AzureStorageBillingParser)
  24. if !ok {
  25. return false
  26. }
  27. return asbp.StorageConnection.Equals(&thatConfig.StorageConnection)
  28. }
  29. type AzureBillingResultFunc func(*BillingRowValues) error
  30. func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, resultFn AzureBillingResultFunc) error {
  31. err := asbp.Validate()
  32. if err != nil {
  33. asbp.ConnectionStatus = cloud.InvalidConfiguration
  34. return err
  35. }
  36. serviceURL := fmt.Sprintf(asbp.StorageConnection.getBlobURLTemplate(), asbp.Account, "")
  37. client, err := asbp.Authorizer.GetBlobClient(serviceURL)
  38. if err != nil {
  39. asbp.ConnectionStatus = cloud.FailedConnection
  40. return err
  41. }
  42. ctx := context.Background()
  43. // Example blobNames: [ export/myExport/20240101-20240131/myExport_758a42af-0731-4edb-b498-1e523bb40f12.csv ]
  44. blobNames, err := asbp.getMostRecentBlobs(start, end, client, ctx)
  45. if err != nil {
  46. asbp.ConnectionStatus = cloud.FailedConnection
  47. return err
  48. }
  49. if len(blobNames) == 0 && asbp.ConnectionStatus != cloud.SuccessfulConnection {
  50. asbp.ConnectionStatus = cloud.MissingData
  51. return nil
  52. }
  53. for _, blobName := range blobNames {
  54. if env.IsAzureDownloadBillingDataToDisk() {
  55. localPath := filepath.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), "db", "cloudcost")
  56. localFilePath := filepath.Join(localPath, filepath.Base(blobName))
  57. if _, err := asbp.deleteFilesOlderThan7d(localPath); err != nil {
  58. log.Warnf("CloudCost: Azure: ParseBillingData: failed to remove the following stale files: %v", err)
  59. }
  60. err := asbp.DownloadBlobToFile(localFilePath, blobName, client, ctx)
  61. if err != nil {
  62. asbp.ConnectionStatus = cloud.FailedConnection
  63. return err
  64. }
  65. fp, err := os.Open(localFilePath)
  66. if err != nil {
  67. asbp.ConnectionStatus = cloud.FailedConnection
  68. return err
  69. }
  70. defer fp.Close()
  71. err = asbp.parseCSV(start, end, csv.NewReader(fp), resultFn)
  72. if err != nil {
  73. asbp.ConnectionStatus = cloud.ParseError
  74. return err
  75. }
  76. } else {
  77. blobBytes, err2 := asbp.DownloadBlob(blobName, client, ctx)
  78. if err2 != nil {
  79. asbp.ConnectionStatus = cloud.FailedConnection
  80. return err2
  81. }
  82. err2 = asbp.parseCSV(start, end, csv.NewReader(bytes.NewReader(blobBytes)), resultFn)
  83. if err2 != nil {
  84. asbp.ConnectionStatus = cloud.ParseError
  85. return err2
  86. }
  87. }
  88. }
  89. asbp.ConnectionStatus = cloud.SuccessfulConnection
  90. return nil
  91. }
  92. func (asbp *AzureStorageBillingParser) parseCSV(start, end time.Time, reader *csv.Reader, resultFn AzureBillingResultFunc) error {
  93. headers, err := reader.Read()
  94. if err != nil {
  95. return err
  96. }
  97. abp, err := NewBillingParseSchema(headers)
  98. if err != nil {
  99. return err
  100. }
  101. for {
  102. var record, err = reader.Read()
  103. if err == io.EOF {
  104. break
  105. }
  106. if err != nil {
  107. return err
  108. }
  109. abv := abp.ParseRow(start, end, record)
  110. if abv == nil {
  111. continue
  112. }
  113. err = resultFn(abv)
  114. if err != nil {
  115. return err
  116. }
  117. }
  118. return nil
  119. }
  120. // getMostRecentBlobs returns a list of filepaths on the Azure Storage
  121. // Container. It uses the "Last Modified Time" of the file to determine which
  122. // has the latest month-to-date billing data.
  123. func (asbp *AzureStorageBillingParser) getMostRecentBlobs(start, end time.Time, client *azblob.Client, ctx context.Context) ([]string, error) {
  124. log.Infof("Azure Storage: retrieving most recent reports from: %v - %v", start, end)
  125. // Get list of month substrings for months contained in the start to end range
  126. monthStrs, err := asbp.getMonthStrings(start, end)
  127. if err != nil {
  128. return nil, err
  129. }
  130. mostRecentBlobs := make(map[string]container.BlobItem)
  131. pager := client.NewListBlobsFlatPager(asbp.Container, &azblob.ListBlobsFlatOptions{
  132. Include: container.ListBlobsInclude{Deleted: false, Versions: false},
  133. })
  134. for pager.More() {
  135. resp, err := pager.NextPage(ctx)
  136. if err != nil {
  137. return nil, err
  138. }
  139. // Using the list of months strings find the most resent blob for each month in the range
  140. for _, blobInfo := range resp.Segment.BlobItems {
  141. if blobInfo.Name == nil {
  142. continue
  143. }
  144. // If Container Path configuration exists, check if it is in the blobs name
  145. if asbp.Path != "" && !strings.Contains(*blobInfo.Name, asbp.Path) {
  146. continue
  147. }
  148. for _, month := range monthStrs {
  149. if strings.Contains(*blobInfo.Name, month) {
  150. // check if blob is the newest seen for this month
  151. if prevBlob, ok := mostRecentBlobs[month]; ok {
  152. if prevBlob.Properties.CreationTime.After(*blobInfo.Properties.CreationTime) {
  153. continue
  154. }
  155. }
  156. mostRecentBlobs[month] = *blobInfo
  157. }
  158. }
  159. }
  160. }
  161. // convert blob names into blob urls and move from map into ordered list of blob names
  162. var blobNames []string
  163. for _, month := range monthStrs {
  164. if blob, ok := mostRecentBlobs[month]; ok {
  165. blobNames = append(blobNames, *blob.Name)
  166. }
  167. }
  168. return blobNames, nil
  169. }
  170. // getMonthStrings returns a list of month strings in the format
  171. // "YYYYMMDD-YYYYMMDD", where the dates are exactly the first and last day of
  172. // the month. It includes all month strings which would capture the start and
  173. // end parameters.
  174. // For example: ["20240201-20240229", "20240101-20240131", "20231201-20231231"]
  175. func (asbp *AzureStorageBillingParser) getMonthStrings(start, end time.Time) ([]string, error) {
  176. if start.After(end) {
  177. return []string{}, fmt.Errorf("start date must be before end date")
  178. }
  179. if end.After(time.Now()) {
  180. end = time.Now()
  181. }
  182. var monthStrs []string
  183. monthStr := asbp.timeToMonthString(start)
  184. endStr := asbp.timeToMonthString(end)
  185. monthStrs = append(monthStrs, monthStr)
  186. currMonth := start.AddDate(0, 0, -start.Day()+1)
  187. for monthStr != endStr {
  188. currMonth = currMonth.AddDate(0, 1, 0)
  189. monthStr = asbp.timeToMonthString(currMonth)
  190. monthStrs = append(monthStrs, monthStr)
  191. }
  192. return monthStrs, nil
  193. }
  194. func (asbp *AzureStorageBillingParser) timeToMonthString(input time.Time) string {
  195. format := "20060102"
  196. startOfMonth := input.AddDate(0, 0, -input.Day()+1)
  197. endOfMonth := input.AddDate(0, 1, -input.Day())
  198. return startOfMonth.Format(format) + "-" + endOfMonth.Format(format)
  199. }
  200. // deleteFilesOlderThan7d recursively walks the directory specified and deletes
  201. // files which have not been modified in the last 7 days. Returns a list of
  202. // files deleted.
  203. func (asbp *AzureStorageBillingParser) deleteFilesOlderThan7d(localPath string) ([]string, error) {
  204. duration := 7 * 24 * time.Hour
  205. cleaned := []string{}
  206. errs := []string{}
  207. if _, err := os.Stat(localPath); err != nil {
  208. return cleaned, nil // localPath does not exist
  209. }
  210. filepath.Walk(localPath, func(path string, info os.FileInfo, err error) error {
  211. if err != nil {
  212. errs = append(errs, err.Error())
  213. return err
  214. }
  215. if time.Since(info.ModTime()) > duration {
  216. err := os.Remove(path)
  217. if err != nil {
  218. errs = append(errs, err.Error())
  219. }
  220. cleaned = append(cleaned, path)
  221. }
  222. return nil
  223. })
  224. if len(errs) == 0 {
  225. return cleaned, nil
  226. } else {
  227. return cleaned, fmt.Errorf("deleteFilesOlderThan7d: %v", errs)
  228. }
  229. }