2
0

storagebillingparser.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. package azure
  2. import (
  3. "compress/gzip"
  4. "context"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "os"
  10. "path/filepath"
  11. "strings"
  12. "time"
  13. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
  14. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
  15. "github.com/opencost/opencost/core/pkg/log"
  16. "github.com/opencost/opencost/core/pkg/util/timeutil"
  17. "github.com/opencost/opencost/pkg/cloud"
  18. "github.com/opencost/opencost/pkg/env"
  19. )
  20. // AzureStorageBillingParser accesses billing data stored in CSV files in Azure Storage
  21. type AzureStorageBillingParser struct {
  22. StorageConnection
  23. }
  24. func (asbp *AzureStorageBillingParser) Equals(config cloud.Config) bool {
  25. thatConfig, ok := config.(*AzureStorageBillingParser)
  26. if !ok {
  27. return false
  28. }
  29. return asbp.StorageConnection.Equals(&thatConfig.StorageConnection)
  30. }
  31. // decompressIfGzipped wraps the reader with a gzip reader if the blob name indicates
  32. // the file is gzip compressed. Returns the original reader if not compressed.
  33. func decompressIfGzipped(r io.Reader, blobName string) (io.ReadCloser, error) {
  34. if strings.HasSuffix(strings.ToLower(blobName), ".gz") {
  35. gr, err := gzip.NewReader(r)
  36. if err != nil {
  37. return nil, fmt.Errorf("failed to create gzip reader for %s: %w", blobName, err)
  38. }
  39. return gr, nil
  40. }
  41. // Return a NopCloser to maintain consistent interface
  42. return io.NopCloser(r), nil
  43. }
  44. // processLocalBillingFile reads a local billing file, decompresses if needed, and parses it
  45. func (asbp *AzureStorageBillingParser) processLocalBillingFile(localFilePath, blobName string, start, end time.Time, resultFn AzureBillingResultFunc) error {
  46. fp, err := os.Open(localFilePath)
  47. if err != nil {
  48. return err
  49. }
  50. defer fp.Close()
  51. // Wrap with gzip reader if needed
  52. reader, err := decompressIfGzipped(fp, blobName)
  53. if err != nil {
  54. return err
  55. }
  56. defer reader.Close()
  57. err = asbp.parseCSV(start, end, csv.NewReader(reader), resultFn)
  58. if err != nil {
  59. return err
  60. }
  61. return nil
  62. }
  63. // processStreamBillingData reads streaming billing data, decompresses if needed, and parses it
  64. func (asbp *AzureStorageBillingParser) processStreamBillingData(streamReader io.Reader, blobName string, start, end time.Time, resultFn AzureBillingResultFunc) error {
  65. // Wrap with gzip reader if needed
  66. reader, err := decompressIfGzipped(streamReader, blobName)
  67. if err != nil {
  68. return err
  69. }
  70. defer reader.Close()
  71. err = asbp.parseCSV(start, end, csv.NewReader(reader), resultFn)
  72. if err != nil {
  73. return err
  74. }
  75. return nil
  76. }
  77. type AzureBillingResultFunc func(*BillingRowValues) error
  78. func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, resultFn AzureBillingResultFunc) error {
  79. err := asbp.Validate()
  80. if err != nil {
  81. asbp.ConnectionStatus = cloud.InvalidConfiguration
  82. return err
  83. }
  84. // most recent blob list contains information on blob including name and lastMod time
  85. // Example blobNames: [ export/myExport/20240101-20240131/myExport_758a42af-0731-4edb-b498-1e523bb40f12.csv ]
  86. ctx := context.Background()
  87. blobInfos, err := asbp.getBlobInfos(ctx, start, end)
  88. if err != nil {
  89. asbp.ConnectionStatus = cloud.FailedConnection
  90. return err
  91. }
  92. if len(blobInfos) == 0 && asbp.ConnectionStatus != cloud.SuccessfulConnection {
  93. asbp.ConnectionStatus = cloud.MissingData
  94. return nil
  95. }
  96. client, err := asbp.getClient()
  97. if err != nil {
  98. asbp.ConnectionStatus = cloud.FailedConnection
  99. return err
  100. }
  101. if env.IsAzureDownloadBillingDataToDisk() {
  102. // clean up old files that have been saved to disk before downloading new ones
  103. localPath := env.GetAzureDownloadBillingDataPath()
  104. if _, err := asbp.deleteFilesOlderThan7d(localPath); err != nil {
  105. log.Warnf("CloudCost: Azure: ParseBillingData: failed to remove the following stale files: %v", err)
  106. }
  107. for _, blob := range blobInfos {
  108. blobName := *blob.Name
  109. // Use entire blob name to prevent collision with other files from previous months or other integrations (ex "part_0_0001.csv")
  110. localFilePath := filepath.Join(localPath, strings.ReplaceAll(blobName, "/", "_"))
  111. err := asbp.DownloadBlobToFile(localFilePath, blob, client, ctx)
  112. if err != nil {
  113. asbp.ConnectionStatus = cloud.FailedConnection
  114. return err
  115. }
  116. err = asbp.processLocalBillingFile(localFilePath, blobName, start, end, resultFn)
  117. if err != nil {
  118. asbp.ConnectionStatus = cloud.ParseError
  119. return err
  120. }
  121. }
  122. } else {
  123. for _, blobInfo := range blobInfos {
  124. blobName := *blobInfo.Name
  125. streamReader, err := asbp.StreamBlob(blobName, client)
  126. if err != nil {
  127. asbp.ConnectionStatus = cloud.FailedConnection
  128. return err
  129. }
  130. err = asbp.processStreamBillingData(streamReader, blobName, start, end, resultFn)
  131. if err != nil {
  132. asbp.ConnectionStatus = cloud.ParseError
  133. return err
  134. }
  135. }
  136. }
  137. asbp.ConnectionStatus = cloud.SuccessfulConnection
  138. return nil
  139. }
  140. func (asbp *AzureStorageBillingParser) getClient() (*azblob.Client, error) {
  141. serviceURL := fmt.Sprintf(asbp.StorageConnection.getBlobURLTemplate(), asbp.Account, "")
  142. client, err := asbp.Authorizer.GetBlobClient(serviceURL)
  143. if err != nil {
  144. return nil, err
  145. }
  146. return client, nil
  147. }
  148. func (asbp *AzureStorageBillingParser) getBlobInfos(ctx context.Context, start, end time.Time) ([]container.BlobItem, error) {
  149. client, err := asbp.getClient()
  150. if err != nil {
  151. return nil, err
  152. }
  153. blobInfos, err := asbp.getMostRecentBlobs(start, end, client, ctx)
  154. if err != nil {
  155. return nil, err
  156. }
  157. return blobInfos, nil
  158. }
  159. func (asbp *AzureStorageBillingParser) RefreshStatus() cloud.ConnectionStatus {
  160. end := time.Now().UTC().Truncate(timeutil.Day)
  161. start := end.Add(-7 * timeutil.Day)
  162. ctx := context.Background()
  163. blobInfos, err := asbp.getBlobInfos(ctx, start, end)
  164. if err != nil {
  165. asbp.ConnectionStatus = cloud.FailedConnection
  166. } else if len(blobInfos) == 0 {
  167. asbp.ConnectionStatus = cloud.MissingData
  168. } else {
  169. asbp.ConnectionStatus = cloud.SuccessfulConnection
  170. }
  171. return asbp.ConnectionStatus
  172. }
  173. func (asbp *AzureStorageBillingParser) parseCSV(start, end time.Time, reader *csv.Reader, resultFn AzureBillingResultFunc) error {
  174. headers, err := reader.Read()
  175. if err != nil {
  176. return err
  177. }
  178. abp, err := NewBillingParseSchema(headers)
  179. if err != nil {
  180. return err
  181. }
  182. for {
  183. var record, err = reader.Read()
  184. if err == io.EOF {
  185. break
  186. }
  187. if err != nil {
  188. return err
  189. }
  190. abv := abp.ParseRow(start, end, record)
  191. if abv == nil {
  192. continue
  193. }
  194. err = resultFn(abv)
  195. if err != nil {
  196. return err
  197. }
  198. }
  199. return nil
  200. }
  201. // getMostRecentBlobs returns a list of blobs in the Azure Storage
  202. // Container. It uses the "Last Modified Time" of the file to determine which
  203. // has the latest month-to-date billing data.
  204. func (asbp *AzureStorageBillingParser) getMostRecentBlobs(start, end time.Time, client *azblob.Client, ctx context.Context) ([]container.BlobItem, error) {
  205. log.Infof("Azure Storage: retrieving most recent reports from: %v - %v", start, end)
  206. // Get list of month substrings for months contained in the start to end range
  207. monthStrs, err := asbp.getMonthStrings(start, end)
  208. if err != nil {
  209. return nil, err
  210. }
  211. // Build map of blobs keyed by month string and blob name
  212. blobsForMonth := make(map[string]map[string]container.BlobItem)
  213. pager := client.NewListBlobsFlatPager(asbp.Container, &azblob.ListBlobsFlatOptions{
  214. Include: container.ListBlobsInclude{Deleted: false, Versions: false},
  215. })
  216. for pager.More() {
  217. resp, err := pager.NextPage(ctx)
  218. if err != nil {
  219. return nil, err
  220. }
  221. // Using the list of months strings find the most resent blob for each month in the range
  222. for _, blobInfo := range resp.Segment.BlobItems {
  223. if blobInfo.Name == nil {
  224. continue
  225. }
  226. // If Container Path configuration exists, check if it is in the blobs name
  227. if asbp.Path != "" && !strings.Contains(*blobInfo.Name, asbp.Path) {
  228. continue
  229. }
  230. for _, month := range monthStrs {
  231. if strings.Contains(*blobInfo.Name, month) {
  232. if _, ok := blobsForMonth[month]; !ok {
  233. blobsForMonth[month] = make(map[string]container.BlobItem)
  234. }
  235. blobsForMonth[month][*blobInfo.Name] = *blobInfo
  236. }
  237. }
  238. }
  239. }
  240. // build list of most recent blobs that are needed to fulfil a query on the give date range
  241. var blobs []container.BlobItem
  242. for _, monthBlobs := range blobsForMonth {
  243. // Find most recent blob
  244. var mostRecentBlob *container.BlobItem
  245. var mostRecentManifest *container.BlobItem
  246. for name := range monthBlobs {
  247. blob := monthBlobs[name]
  248. lastMod := *blob.Properties.LastModified
  249. // Handle manifest files
  250. if strings.HasSuffix(*blob.Name, "manifest.json") {
  251. if mostRecentManifest == nil {
  252. mostRecentManifest = &blob
  253. continue
  254. }
  255. if mostRecentManifest.Properties.LastModified.Before(lastMod) {
  256. mostRecentManifest = &blob
  257. }
  258. // Only look at non-manifest blobs if manifests are not present
  259. } else if mostRecentManifest == nil {
  260. if mostRecentBlob == nil {
  261. mostRecentBlob = &blob
  262. continue
  263. }
  264. if mostRecentBlob.Properties.LastModified.Before(lastMod) {
  265. mostRecentBlob = &blob
  266. }
  267. }
  268. }
  269. // In the absence of a manifest, add the most recent blob
  270. if mostRecentManifest == nil {
  271. if mostRecentBlob != nil {
  272. blobs = append(blobs, *mostRecentBlob)
  273. }
  274. continue
  275. }
  276. // download manifest for the month
  277. manifestBytes, err := asbp.DownloadBlob(*mostRecentManifest.Name, client, ctx)
  278. if err != nil {
  279. return nil, fmt.Errorf("failed to retrieve manifest %w", err)
  280. }
  281. var manifest manifestJson
  282. err = json.Unmarshal(manifestBytes, &manifest)
  283. if err != nil {
  284. return nil, fmt.Errorf("failed to unmarshal manifest %w", err)
  285. }
  286. // Add all partitioned blobs named in the manifest to the list of blobs to be retrieved
  287. for _, mb := range manifest.Blobs {
  288. namedBlob, ok := monthBlobs[mb.BlobName]
  289. if !ok {
  290. log.Errorf("AzureStorage: failed to find blob named in manifest '%s'", mb.BlobName)
  291. continue
  292. }
  293. blobs = append(blobs, namedBlob)
  294. }
  295. }
  296. return blobs, nil
  297. }
  298. // manifestJson is a struct for unmarshalling manifest.json files associated with the azure billing export
  299. type manifestJson struct {
  300. Blobs []manifestBlob `json:"blobs"`
  301. }
  302. type manifestBlob struct {
  303. BlobName string `json:"blobName"`
  304. }
  305. // getMonthStrings returns a list of month strings in the format
  306. // "YYYYMMDD-YYYYMMDD", where the dates are exactly the first and last day of
  307. // the month. It includes all month strings which would capture the start and
  308. // end parameters.
  309. // For example: ["20240201-20240229", "20240101-20240131", "20231201-20231231"]
  310. func (asbp *AzureStorageBillingParser) getMonthStrings(start, end time.Time) ([]string, error) {
  311. if start.After(end) {
  312. return []string{}, fmt.Errorf("start date must be before end date")
  313. }
  314. if end.After(time.Now()) {
  315. end = time.Now()
  316. }
  317. var monthStrs []string
  318. monthStr := asbp.timeToMonthString(start)
  319. endStr := asbp.timeToMonthString(end)
  320. monthStrs = append(monthStrs, monthStr)
  321. currMonth := start.AddDate(0, 0, -start.Day()+1)
  322. for monthStr != endStr {
  323. currMonth = currMonth.AddDate(0, 1, 0)
  324. monthStr = asbp.timeToMonthString(currMonth)
  325. monthStrs = append(monthStrs, monthStr)
  326. }
  327. return monthStrs, nil
  328. }
  329. func (asbp *AzureStorageBillingParser) timeToMonthString(input time.Time) string {
  330. format := "20060102"
  331. startOfMonth := input.AddDate(0, 0, -input.Day()+1)
  332. endOfMonth := input.AddDate(0, 1, -input.Day())
  333. return startOfMonth.Format(format) + "-" + endOfMonth.Format(format)
  334. }