storagebillingparser.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. package azure
  2. import (
  3. "context"
  4. "encoding/csv"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "strings"
  11. "time"
  12. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
  13. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
  14. "github.com/opencost/opencost/core/pkg/log"
  15. "github.com/opencost/opencost/core/pkg/util/timeutil"
  16. "github.com/opencost/opencost/pkg/cloud"
  17. "github.com/opencost/opencost/pkg/env"
  18. )
  19. // AzureStorageBillingParser accesses billing data stored in CSV files in Azure Storage
  20. type AzureStorageBillingParser struct {
  21. StorageConnection
  22. }
  23. func (asbp *AzureStorageBillingParser) Equals(config cloud.Config) bool {
  24. thatConfig, ok := config.(*AzureStorageBillingParser)
  25. if !ok {
  26. return false
  27. }
  28. return asbp.StorageConnection.Equals(&thatConfig.StorageConnection)
  29. }
  30. type AzureBillingResultFunc func(*BillingRowValues) error
  31. func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, resultFn AzureBillingResultFunc) error {
  32. err := asbp.Validate()
  33. if err != nil {
  34. asbp.ConnectionStatus = cloud.InvalidConfiguration
  35. return err
  36. }
  37. // most recent blob list contains information on blob including name and lastMod time
  38. // Example blobNames: [ export/myExport/20240101-20240131/myExport_758a42af-0731-4edb-b498-1e523bb40f12.csv ]
  39. ctx := context.Background()
  40. blobInfos, err := asbp.getBlobInfos(ctx, start, end)
  41. if err != nil {
  42. asbp.ConnectionStatus = cloud.FailedConnection
  43. return err
  44. }
  45. if len(blobInfos) == 0 && asbp.ConnectionStatus != cloud.SuccessfulConnection {
  46. asbp.ConnectionStatus = cloud.MissingData
  47. return nil
  48. }
  49. client, err := asbp.getClient()
  50. if err != nil {
  51. asbp.ConnectionStatus = cloud.FailedConnection
  52. return err
  53. }
  54. if env.IsAzureDownloadBillingDataToDisk() {
  55. // clean up old files that have been saved to disk before downloading new ones
  56. localPath := env.GetAzureDownloadBillingDataPath()
  57. if _, err := asbp.deleteFilesOlderThan7d(localPath); err != nil {
  58. log.Warnf("CloudCost: Azure: ParseBillingData: failed to remove the following stale files: %v", err)
  59. }
  60. for _, blob := range blobInfos {
  61. blobName := *blob.Name
  62. // Use entire blob name to prevent collision with other files from previous months or other integrations (ex "part_0_0001.csv")
  63. localFilePath := filepath.Join(localPath, strings.ReplaceAll(blobName, "/", "_"))
  64. err := asbp.DownloadBlobToFile(localFilePath, blob, client, ctx)
  65. if err != nil {
  66. asbp.ConnectionStatus = cloud.FailedConnection
  67. return err
  68. }
  69. fp, err := os.Open(localFilePath)
  70. if err != nil {
  71. asbp.ConnectionStatus = cloud.FailedConnection
  72. return err
  73. }
  74. defer fp.Close()
  75. err = asbp.parseCSV(start, end, csv.NewReader(fp), resultFn)
  76. if err != nil {
  77. asbp.ConnectionStatus = cloud.ParseError
  78. return err
  79. }
  80. }
  81. } else {
  82. for _, blobInfo := range blobInfos {
  83. blobName := *blobInfo.Name
  84. streamReader, err2 := asbp.StreamBlob(blobName, client)
  85. if err2 != nil {
  86. asbp.ConnectionStatus = cloud.FailedConnection
  87. return err2
  88. }
  89. err2 = asbp.parseCSV(start, end, csv.NewReader(streamReader), resultFn)
  90. if err2 != nil {
  91. asbp.ConnectionStatus = cloud.ParseError
  92. return err2
  93. }
  94. }
  95. }
  96. asbp.ConnectionStatus = cloud.SuccessfulConnection
  97. return nil
  98. }
  99. func (asbp *AzureStorageBillingParser) getClient() (*azblob.Client, error) {
  100. serviceURL := fmt.Sprintf(asbp.StorageConnection.getBlobURLTemplate(), asbp.Account, "")
  101. client, err := asbp.Authorizer.GetBlobClient(serviceURL)
  102. if err != nil {
  103. return nil, err
  104. }
  105. return client, nil
  106. }
  107. func (asbp *AzureStorageBillingParser) getBlobInfos(ctx context.Context, start, end time.Time) ([]container.BlobItem, error) {
  108. client, err := asbp.getClient()
  109. if err != nil {
  110. return nil, err
  111. }
  112. blobInfos, err := asbp.getMostRecentBlobs(start, end, client, ctx)
  113. if err != nil {
  114. return nil, err
  115. }
  116. return blobInfos, nil
  117. }
  118. func (asbp *AzureStorageBillingParser) RefreshStatus() cloud.ConnectionStatus {
  119. end := time.Now().UTC().Truncate(timeutil.Day)
  120. start := end.Add(-7 * timeutil.Day)
  121. ctx := context.Background()
  122. blobInfos, err := asbp.getBlobInfos(ctx, start, end)
  123. if err != nil {
  124. asbp.ConnectionStatus = cloud.FailedConnection
  125. } else if len(blobInfos) == 0 {
  126. asbp.ConnectionStatus = cloud.MissingData
  127. } else {
  128. asbp.ConnectionStatus = cloud.SuccessfulConnection
  129. }
  130. return asbp.ConnectionStatus
  131. }
  132. func (asbp *AzureStorageBillingParser) parseCSV(start, end time.Time, reader *csv.Reader, resultFn AzureBillingResultFunc) error {
  133. headers, err := reader.Read()
  134. if err != nil {
  135. return err
  136. }
  137. abp, err := NewBillingParseSchema(headers)
  138. if err != nil {
  139. return err
  140. }
  141. for {
  142. var record, err = reader.Read()
  143. if err == io.EOF {
  144. break
  145. }
  146. if err != nil {
  147. return err
  148. }
  149. abv := abp.ParseRow(start, end, record)
  150. if abv == nil {
  151. continue
  152. }
  153. err = resultFn(abv)
  154. if err != nil {
  155. return err
  156. }
  157. }
  158. return nil
  159. }
  160. // getMostRecentBlobs returns a list of blobs in the Azure Storage
  161. // Container. It uses the "Last Modified Time" of the file to determine which
  162. // has the latest month-to-date billing data.
  163. func (asbp *AzureStorageBillingParser) getMostRecentBlobs(start, end time.Time, client *azblob.Client, ctx context.Context) ([]container.BlobItem, error) {
  164. log.Infof("Azure Storage: retrieving most recent reports from: %v - %v", start, end)
  165. // Get list of month substrings for months contained in the start to end range
  166. monthStrs, err := asbp.getMonthStrings(start, end)
  167. if err != nil {
  168. return nil, err
  169. }
  170. // Build map of blobs keyed by month string and blob name
  171. blobsForMonth := make(map[string]map[string]container.BlobItem)
  172. pager := client.NewListBlobsFlatPager(asbp.Container, &azblob.ListBlobsFlatOptions{
  173. Include: container.ListBlobsInclude{Deleted: false, Versions: false},
  174. })
  175. for pager.More() {
  176. resp, err := pager.NextPage(ctx)
  177. if err != nil {
  178. return nil, err
  179. }
  180. // Using the list of months strings find the most resent blob for each month in the range
  181. for _, blobInfo := range resp.Segment.BlobItems {
  182. if blobInfo.Name == nil {
  183. continue
  184. }
  185. // If Container Path configuration exists, check if it is in the blobs name
  186. if asbp.Path != "" && !strings.Contains(*blobInfo.Name, asbp.Path) {
  187. continue
  188. }
  189. for _, month := range monthStrs {
  190. if strings.Contains(*blobInfo.Name, month) {
  191. if _, ok := blobsForMonth[month]; !ok {
  192. blobsForMonth[month] = make(map[string]container.BlobItem)
  193. }
  194. blobsForMonth[month][*blobInfo.Name] = *blobInfo
  195. }
  196. }
  197. }
  198. }
  199. // build list of most recent blobs that are needed to fulfil a query on the give date range
  200. var blobs []container.BlobItem
  201. for _, monthBlobs := range blobsForMonth {
  202. // Find most recent blob
  203. var mostRecentBlob *container.BlobItem
  204. var mostRecentManifest *container.BlobItem
  205. for name := range monthBlobs {
  206. blob := monthBlobs[name]
  207. lastMod := *blob.Properties.LastModified
  208. // Handle manifest files
  209. if strings.HasSuffix(*blob.Name, "manifest.json") {
  210. if mostRecentManifest == nil {
  211. mostRecentManifest = &blob
  212. continue
  213. }
  214. if mostRecentManifest.Properties.LastModified.Before(lastMod) {
  215. mostRecentManifest = &blob
  216. }
  217. // Only look at non-manifest blobs if manifests are not present
  218. } else if mostRecentManifest == nil {
  219. if mostRecentBlob == nil {
  220. mostRecentBlob = &blob
  221. continue
  222. }
  223. if mostRecentBlob.Properties.LastModified.Before(lastMod) {
  224. mostRecentBlob = &blob
  225. }
  226. }
  227. }
  228. // In the absence of a manifest, add the most recent blob
  229. if mostRecentManifest == nil {
  230. if mostRecentBlob != nil {
  231. blobs = append(blobs, *mostRecentBlob)
  232. }
  233. continue
  234. }
  235. // download manifest for the month
  236. manifestBytes, err := asbp.DownloadBlob(*mostRecentManifest.Name, client, ctx)
  237. if err != nil {
  238. return nil, fmt.Errorf("failed to retrieve manifest %w", err)
  239. }
  240. var manifest manifestJson
  241. err = json.Unmarshal(manifestBytes, &manifest)
  242. if err != nil {
  243. return nil, fmt.Errorf("failed to unmarshal manifest %w", err)
  244. }
  245. // Add all partitioned blobs named in the manifest to the list of blobs to be retrieved
  246. for _, mb := range manifest.Blobs {
  247. namedBlob, ok := monthBlobs[mb.BlobName]
  248. if !ok {
  249. log.Errorf("AzureStorage: failed to find blob named in manifest '%s'", mb.BlobName)
  250. continue
  251. }
  252. blobs = append(blobs, namedBlob)
  253. }
  254. }
  255. return blobs, nil
  256. }
  257. // manifestJson is a struct for unmarshalling manifest.json files associated with the azure billing export
  258. type manifestJson struct {
  259. Blobs []manifestBlob `json:"blobs"`
  260. }
  261. type manifestBlob struct {
  262. BlobName string `json:"blobName"`
  263. }
  264. // getMonthStrings returns a list of month strings in the format
  265. // "YYYYMMDD-YYYYMMDD", where the dates are exactly the first and last day of
  266. // the month. It includes all month strings which would capture the start and
  267. // end parameters.
  268. // For example: ["20240201-20240229", "20240101-20240131", "20231201-20231231"]
  269. func (asbp *AzureStorageBillingParser) getMonthStrings(start, end time.Time) ([]string, error) {
  270. if start.After(end) {
  271. return []string{}, fmt.Errorf("start date must be before end date")
  272. }
  273. if end.After(time.Now()) {
  274. end = time.Now()
  275. }
  276. var monthStrs []string
  277. monthStr := asbp.timeToMonthString(start)
  278. endStr := asbp.timeToMonthString(end)
  279. monthStrs = append(monthStrs, monthStr)
  280. currMonth := start.AddDate(0, 0, -start.Day()+1)
  281. for monthStr != endStr {
  282. currMonth = currMonth.AddDate(0, 1, 0)
  283. monthStr = asbp.timeToMonthString(currMonth)
  284. monthStrs = append(monthStrs, monthStr)
  285. }
  286. return monthStrs, nil
  287. }
  288. func (asbp *AzureStorageBillingParser) timeToMonthString(input time.Time) string {
  289. format := "20060102"
  290. startOfMonth := input.AddDate(0, 0, -input.Day()+1)
  291. endOfMonth := input.AddDate(0, 1, -input.Day())
  292. return startOfMonth.Format(format) + "-" + endOfMonth.Format(format)
  293. }