| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- package cloud
- import (
- "bytes"
- "context"
- "encoding/csv"
- "fmt"
- "github.com/Azure/azure-storage-blob-go/azblob"
- "github.com/kubecost/cost-model/pkg/env"
- "net/url"
- "strings"
- "time"
- )
- type CSVRetriever interface {
- GetCSVReaders(start, end time.Time) ([]*csv.Reader, error)
- }
- type AzureCSVRetriever struct {
- }
- func (acr AzureCSVRetriever) GetCSVReaders(start, end time.Time) ([]*csv.Reader, error) {
- containerURL, err := acr.getContainer()
- if err != nil {
- return nil, err
- }
- return acr.getMostRecentFiles(start, end, containerURL)
- }
- func (acr AzureCSVRetriever) getMostRecentFiles(start, end time.Time, containerURL *azblob.ContainerURL) ([]*csv.Reader, error) {
- ctx := context.Background()
- blobNames, err := acr.getMostResentBlobNames(start, end, ctx, containerURL)
- if err != nil {
- return nil, err
- }
- var readers []*csv.Reader
- for _, blobName := range blobNames {
- blobURL := containerURL.NewBlobURL(blobName)
- downloadResponse, err := blobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
- if err != nil {
- return nil, err
- }
- // NOTE: automatically retries are performed if the connection fails
- bodyStream := downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20})
- // read the body into a buffer
- downloadedData := bytes.Buffer{}
- _, err = downloadedData.ReadFrom(bodyStream)
- if err != nil {
- return nil, err
- }
- reader := csv.NewReader(bytes.NewReader(downloadedData.Bytes()))
- readers = append(readers, reader)
- }
- return readers, nil
- }
- func (acr AzureCSVRetriever) getContainer() (*azblob.ContainerURL, error) {
- accountKey := env.GetAzureStorageAccessKey()
- accountName := env.GetAzureStorageAccountName()
- containerName := env.GetAzureStorageContainerName()
- if accountName == "" || accountKey == "" || containerName == "" {
- return nil, fmt.Errorf("set up Azure storage config to access out of cluster costs")
- }
- // Create a default request pipeline using your storage account name and account key.
- credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
- if err != nil {
- return nil, err
- }
- p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
- // From the Azure portal, get your storage account blob service URL endpoint.
- URL, _ := url.Parse(
- fmt.Sprintf("https://%s.blob.core.windows.net/%s", accountName, containerName))
- // Create a ContainerURL object that wraps the container URL and a request
- // pipeline to make requests.
- containerURL := azblob.NewContainerURL(*URL, p)
- return &containerURL, nil
- }
- func (acr AzureCSVRetriever) getMostResentBlobNames(start, end time.Time, ctx context.Context, containerURL *azblob.ContainerURL) ([]string, error) {
- // Get list of month substrings for months contained in the start to end range
- monthStrs, err := acr.getMonthStrings(start, end)
- if err != nil {
- return nil, err
- }
- mostResentBlobs := make(map[string]azblob.BlobItemInternal)
- for marker := (azblob.Marker{}); marker.NotDone(); {
- // Get a result segment starting with the blob indicated by the current Marker.
- listBlob, err := containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{})
- if err != nil {
- return nil, err
- }
- // ListBlobs returns the start of the next segment; you MUST use this to get
- // the next segment (after processing the current result segment).
- marker = listBlob.NextMarker
- // Using the list of months strings find the most resent blob for each month in the range
- for _, blobInfo := range listBlob.Segment.BlobItems {
- for _, month := range monthStrs {
- if strings.Contains(blobInfo.Name, month) {
- if prevBlob, ok := mostResentBlobs[month]; ok {
- if prevBlob.Properties.CreationTime.After(*blobInfo.Properties.CreationTime) {
- continue
- }
- }
- mostResentBlobs[month] = blobInfo
- }
- }
- }
- }
- // move the blobs names from map into ordered list of blob names
- var blobNames []string
- for _, month := range monthStrs {
- if blob, ok := mostResentBlobs[month]; ok {
- blobNames = append(blobNames, blob.Name)
- }
- }
- return blobNames, nil
- }
- func (acr AzureCSVRetriever) getMonthStrings(start, end time.Time) ([]string, error) {
- if end.After(time.Now()) {
- end = time.Now()
- }
- if start.After(end) {
- return []string{}, fmt.Errorf("start date must be before end date")
- }
- var monthStrs []string
- monthStr := acr.timeToMonthString(start)
- endStr := acr.timeToMonthString(end)
- monthStrs = append(monthStrs, monthStr)
- currMonth := start.AddDate(0, 0, -start.Day()+1)
- for monthStr != endStr {
- currMonth = currMonth.AddDate(0, 1, 0)
- monthStr = acr.timeToMonthString(currMonth)
- monthStrs = append(monthStrs, monthStr)
- }
- return monthStrs, nil
- }
- func (acr AzureCSVRetriever) timeToMonthString(input time.Time) string {
- format := "20060102"
- startOfMonth := input.AddDate(0, 0, -input.Day()+1)
- endOfMonth := input.AddDate(0, 1, -input.Day())
- return startOfMonth.Format(format) + "-" + endOfMonth.Format(format)
- }
|