| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- package azure
- import (
- "bytes"
- "context"
- "encoding/csv"
- "fmt"
- "io"
- "strings"
- "time"
- "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
- "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/pkg/cloud"
- )
- // AzureStorageBillingParser accesses billing data stored in CSV files in Azure Storage
- type AzureStorageBillingParser struct {
- StorageConnection
- }
- func (asbp *AzureStorageBillingParser) Equals(config cloud.Config) bool {
- thatConfig, ok := config.(*AzureStorageBillingParser)
- if !ok {
- return false
- }
- return asbp.StorageConnection.Equals(&thatConfig.StorageConnection)
- }
- type AzureBillingResultFunc func(*BillingRowValues) error
- func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, resultFn AzureBillingResultFunc) error {
- err := asbp.Validate()
- if err != nil {
- asbp.ConnectionStatus = cloud.InvalidConfiguration
- return err
- }
- serviceURL := fmt.Sprintf(asbp.StorageConnection.getBlobURLTemplate(), asbp.Account, "")
- client, err := asbp.Authorizer.GetBlobClient(serviceURL)
- if err != nil {
- asbp.ConnectionStatus = cloud.FailedConnection
- return err
- }
- ctx := context.Background()
- blobNames, err := asbp.getMostRecentBlobs(start, end, client, ctx)
- if err != nil {
- asbp.ConnectionStatus = cloud.FailedConnection
- return err
- }
- if len(blobNames) == 0 && asbp.ConnectionStatus != cloud.SuccessfulConnection {
- asbp.ConnectionStatus = cloud.MissingData
- return nil
- }
- for _, blobName := range blobNames {
- blobBytes, err2 := asbp.DownloadBlob(blobName, client, ctx)
- if err2 != nil {
- asbp.ConnectionStatus = cloud.FailedConnection
- return err2
- }
- err2 = asbp.parseCSV(start, end, csv.NewReader(bytes.NewReader(blobBytes)), resultFn)
- if err2 != nil {
- asbp.ConnectionStatus = cloud.ParseError
- return err2
- }
- }
- asbp.ConnectionStatus = cloud.SuccessfulConnection
- return nil
- }
- func (asbp *AzureStorageBillingParser) parseCSV(start, end time.Time, reader *csv.Reader, resultFn AzureBillingResultFunc) error {
- headers, err := reader.Read()
- if err != nil {
- return err
- }
- abp, err := NewBillingParseSchema(headers)
- if err != nil {
- return err
- }
- for {
- var record, err = reader.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return err
- }
- abv := abp.ParseRow(start, end, record)
- if abv == nil {
- continue
- }
- err = resultFn(abv)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func (asbp *AzureStorageBillingParser) getMostRecentBlobs(start, end time.Time, client *azblob.Client, ctx context.Context) ([]string, error) {
- log.Infof("Azure Storage: retrieving most recent reports from: %v - %v", start, end)
- // Get list of month substrings for months contained in the start to end range
- monthStrs, err := asbp.getMonthStrings(start, end)
- if err != nil {
- return nil, err
- }
- mostResentBlobs := make(map[string]container.BlobItem)
- pager := client.NewListBlobsFlatPager(asbp.Container, &azblob.ListBlobsFlatOptions{
- Include: container.ListBlobsInclude{Deleted: false, Versions: false},
- })
- for pager.More() {
- resp, err := pager.NextPage(ctx)
- if err != nil {
- return nil, err
- }
- // Using the list of months strings find the most resent blob for each month in the range
- for _, blobInfo := range resp.Segment.BlobItems {
- if blobInfo.Name == nil {
- continue
- }
- // If Container Path configuration exists, check if it is in the blobs name
- if asbp.Path != "" && !strings.Contains(*blobInfo.Name, asbp.Path) {
- continue
- }
- for _, month := range monthStrs {
- if strings.Contains(*blobInfo.Name, month) {
- // check if blob is the newest seen for this month
- if prevBlob, ok := mostResentBlobs[month]; ok {
- if prevBlob.Properties.CreationTime.After(*blobInfo.Properties.CreationTime) {
- continue
- }
- }
- mostResentBlobs[month] = *blobInfo
- }
- }
- }
- }
- // convert blob names into blob urls and move from map into ordered list of blob names
- var blobNames []string
- for _, month := range monthStrs {
- if blob, ok := mostResentBlobs[month]; ok {
- blobNames = append(blobNames, *blob.Name)
- }
- }
- return blobNames, nil
- }
- func (asbp *AzureStorageBillingParser) getMonthStrings(start, end time.Time) ([]string, error) {
- if start.After(end) {
- return []string{}, fmt.Errorf("start date must be before end date")
- }
- if end.After(time.Now()) {
- end = time.Now()
- }
- var monthStrs []string
- monthStr := asbp.timeToMonthString(start)
- endStr := asbp.timeToMonthString(end)
- monthStrs = append(monthStrs, monthStr)
- currMonth := start.AddDate(0, 0, -start.Day()+1)
- for monthStr != endStr {
- currMonth = currMonth.AddDate(0, 1, 0)
- monthStr = asbp.timeToMonthString(currMonth)
- monthStrs = append(monthStrs, monthStr)
- }
- return monthStrs, nil
- }
- func (asbp *AzureStorageBillingParser) timeToMonthString(input time.Time) string {
- format := "20060102"
- startOfMonth := input.AddDate(0, 0, -input.Day()+1)
- endOfMonth := input.AddDate(0, 1, -input.Day())
- return startOfMonth.Format(format) + "-" + endOfMonth.Format(format)
- }
|