csvretriever.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package cloud
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/csv"
  6. "fmt"
  7. "github.com/Azure/azure-storage-blob-go/azblob"
  8. "github.com/kubecost/cost-model/pkg/env"
  9. "net/url"
  10. "strings"
  11. "time"
  12. )
  13. type CSVRetriever interface {
  14. GetCSVReaders(start, end time.Time) ([]*csv.Reader, error)
  15. }
  16. type AzureCSVRetriever struct {
  17. }
  18. func (acr AzureCSVRetriever) GetCSVReaders(start, end time.Time) ([]*csv.Reader, error) {
  19. containerURL, err := acr.getContainer()
  20. if err != nil {
  21. return nil, err
  22. }
  23. return acr.getMostRecentFiles(start, end, containerURL)
  24. }
  25. func (acr AzureCSVRetriever) getMostRecentFiles(start, end time.Time, containerURL *azblob.ContainerURL) ([]*csv.Reader, error) {
  26. ctx := context.Background()
  27. blobNames, err := acr.getMostResentBlobNames(start, end, ctx, containerURL)
  28. if err != nil {
  29. return nil, err
  30. }
  31. var readers []*csv.Reader
  32. for _, blobName := range blobNames {
  33. blobURL := containerURL.NewBlobURL(blobName)
  34. downloadResponse, err := blobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
  35. if err != nil {
  36. return nil, err
  37. }
  38. // NOTE: automatically retries are performed if the connection fails
  39. bodyStream := downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20})
  40. // read the body into a buffer
  41. downloadedData := bytes.Buffer{}
  42. _, err = downloadedData.ReadFrom(bodyStream)
  43. if err != nil {
  44. return nil, err
  45. }
  46. reader := csv.NewReader(bytes.NewReader(downloadedData.Bytes()))
  47. readers = append(readers, reader)
  48. }
  49. return readers, nil
  50. }
  51. func (acr AzureCSVRetriever) getContainer() (*azblob.ContainerURL, error) {
  52. accountKey := env.GetAzureStorageAccessKey()
  53. accountName := env.GetAzureStorageAccountName()
  54. containerName := env.GetAzureStorageContainerName()
  55. if accountName == "" || accountKey == "" || containerName == "" {
  56. return nil, fmt.Errorf("set up Azure storage config to access out of cluster costs")
  57. }
  58. // Create a default request pipeline using your storage account name and account key.
  59. credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
  60. if err != nil {
  61. return nil, err
  62. }
  63. p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
  64. // From the Azure portal, get your storage account blob service URL endpoint.
  65. URL, _ := url.Parse(
  66. fmt.Sprintf("https://%s.blob.core.windows.net/%s", accountName, containerName))
  67. // Create a ContainerURL object that wraps the container URL and a request
  68. // pipeline to make requests.
  69. containerURL := azblob.NewContainerURL(*URL, p)
  70. return &containerURL, nil
  71. }
  72. func (acr AzureCSVRetriever) getMostResentBlobNames(start, end time.Time, ctx context.Context, containerURL *azblob.ContainerURL) ([]string, error) {
  73. // Get list of month substrings for months contained in the start to end range
  74. monthStrs, err := acr.getMonthStrings(start, end)
  75. if err != nil {
  76. return nil, err
  77. }
  78. mostResentBlobs := make(map[string]azblob.BlobItemInternal)
  79. for marker := (azblob.Marker{}); marker.NotDone(); {
  80. // Get a result segment starting with the blob indicated by the current Marker.
  81. listBlob, err := containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{})
  82. if err != nil {
  83. return nil, err
  84. }
  85. // ListBlobs returns the start of the next segment; you MUST use this to get
  86. // the next segment (after processing the current result segment).
  87. marker = listBlob.NextMarker
  88. // Using the list of months strings find the most resent blob for each month in the range
  89. for _, blobInfo := range listBlob.Segment.BlobItems {
  90. for _, month := range monthStrs {
  91. if strings.Contains(blobInfo.Name, month) {
  92. if prevBlob, ok := mostResentBlobs[month]; ok {
  93. if prevBlob.Properties.CreationTime.After(*blobInfo.Properties.CreationTime) {
  94. continue
  95. }
  96. }
  97. mostResentBlobs[month] = blobInfo
  98. }
  99. }
  100. }
  101. }
  102. // move the blobs names from map into ordered list of blob names
  103. var blobNames []string
  104. for _, month := range monthStrs {
  105. if blob, ok := mostResentBlobs[month]; ok {
  106. blobNames = append(blobNames, blob.Name)
  107. }
  108. }
  109. return blobNames, nil
  110. }
  111. func (acr AzureCSVRetriever) getMonthStrings(start, end time.Time) ([]string, error) {
  112. if end.After(time.Now()) {
  113. end = time.Now()
  114. }
  115. if start.After(end) {
  116. return []string{}, fmt.Errorf("start date must be before end date")
  117. }
  118. var monthStrs []string
  119. monthStr := acr.timeToMonthString(start)
  120. endStr := acr.timeToMonthString(end)
  121. monthStrs = append(monthStrs, monthStr)
  122. currMonth := start.AddDate(0, 0, -start.Day()+1)
  123. for monthStr != endStr {
  124. currMonth = currMonth.AddDate(0, 1, 0)
  125. monthStr = acr.timeToMonthString(currMonth)
  126. monthStrs = append(monthStrs, monthStr)
  127. }
  128. return monthStrs, nil
  129. }
  130. func (acr AzureCSVRetriever) timeToMonthString(input time.Time) string {
  131. format := "20060102"
  132. startOfMonth := input.AddDate(0, 0, -input.Day()+1)
  133. endOfMonth := input.AddDate(0, 1, -input.Day())
  134. return startOfMonth.Format(format) + "-" + endOfMonth.Format(format)
  135. }