2
0
Эх сурвалжийг харах

Merge branch 'develop' into meetings

Matt Ray 3 жил өмнө
parent
commit
7cdd4cfc70

+ 35 - 0
SECURITY.md

@@ -0,0 +1,35 @@
+# OpenCost Security Policy
+
+The OpenCost project greatly appreciates the need for security and timely updates, given our proximity to cloud billing. We are very grateful to the users, security researchers, and developers for reporting security vulnerabilities to us. All reported security vulnerabilities will be carefully assessed, addressed, and responded to.
+
+## Code Security
+
+Application code is version controlled using GitHub. All code changes are tracked with full revision history and are attributable to a specific individual. Code must be reviewed and accepted by a different engineer than the author of the change.
+
+### Dependabot
+
+OpenCost has [Dependabot](https://docs.github.com/en/code-security/supply-chain-security/understanding-your-software-supply-chain/about-supply-chain-security#what-is-dependabot) enabled for assessing dependencies in the project.
+
+## Supported Versions
+
+OpenCost provides security updates for the two most recent minor versions released on GitHub.
+
+For example, if `v1.102.0` is the most recent stable version, we will address security updates for `v1.101.0` and later. Once `v1.103.0` is released, we will no longer provide updates for `v1.101.x` releases.
+
+## Reporting a Vulnerability
+
+The OpenCost project has enabled [Private vulnerability reporting](https://docs.github.com/en/code-security/security-advisories/guidance-on-reporting-and-writing/privately-reporting-a-security-vulnerability) for our repositories which allows for direct reporting of issues to administrators and maintainers in a secure fashion. Please include a thorough description of the issue, the steps you took to create the issue, affected versions, and, if known, mitigations for the issue. The team will help diagnose the severity of the issue and determine how to address the issue. Issues deemed to be non-critical will be filed as GitHub issues. Critical issues will receive immediate attention and be fixed as quickly as possible.
+
+### Kubecost Bug Bounty
+
+Kubecost offers a Bug Bounty program that pays $250 USD for unique, not previously disclosed publicly available CVEs, and accepted security bug reports submitted to vulnerability-report@kubecost.com.
+
+## Disclosure policy
+
+For known public security vulnerabilities, we will disclose the disclosure as soon as possible after receiving the report. Vulnerabilities discovered for the first time will be disclosed in accordance with the following process:
+
+1. The received security vulnerability report shall be handed over to the security team for follow-up coordination and repair work.
+2. After the vulnerability is confirmed, we will create a draft Security Advisory on GitHub that lists the details of the vulnerability.
+3. Invite related personnel to discuss the fix.
+4. Fork the temporary private repository on GitHub, and collaborate to fix the vulnerability.
+5. After the fixed code is merged into all supported versions, the vulnerability will be publicly posted in the GitHub Advisory Database.

+ 1 - 1
go.mod

@@ -8,7 +8,7 @@ require (
 	cloud.google.com/go/storage v1.28.1
 	github.com/Azure/azure-pipeline-go v0.2.3
 	github.com/Azure/azure-sdk-for-go v65.0.0+incompatible
-	github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.0
+	github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.1-0.20230323231529-14c481f239ec
 	github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.2
 	github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0
 	github.com/Azure/azure-storage-blob-go v0.15.0

+ 2 - 2
go.sum

@@ -56,8 +56,8 @@ github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVt
 github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
 github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw=
 github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
-github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.0 h1:rTnT/Jrcm+figWlYz4Ixzt0SJVR2cMC8lvZcimipiEY=
-github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.0/go.mod h1:ON4tFdPTwRcgWEaVDrN3584Ef+b7GgSJaXxe5fW9t4M=
+github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.1-0.20230323231529-14c481f239ec h1:S83Dzhd3VLyvN2bgFI7/Lgk1etamk3Pk8QQhn3iXt4s=
+github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.1-0.20230323231529-14c481f239ec/go.mod h1:IoxiGSzhL1QHFXa/mlAXCD+sUaP0rxg//yn2w/JH7wg=
 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.2 h1:uqM+VoHjVH6zdlkLF2b6O0ZANcHoj3rO0PoQ3jglUJA=
 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.2/go.mod h1:twTKAa1E6hLmSDjLhaCkbTMQKc7p/rNLU40rLxGEOCI=
 github.com/Azure/azure-sdk-for-go/sdk/internal v1.2.0 h1:leh5DwKv6Ihwi+h60uHtn6UWAxBbZ0q8DwQVMzf61zw=

+ 124 - 0
pkg/cloud/azurepricesheet/client.go

@@ -0,0 +1,124 @@
+package azurepricesheet
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"net/http"
+	"net/url"
+	"time"
+
+	"github.com/Azure/azure-sdk-for-go/sdk/azcore"
+	"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm"
+	armruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm/runtime"
+	"github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud"
+	"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
+	"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
+)
+
+const (
+	moduleName    = "armconsumption"
+	moduleVersion = "v1.0.0"
+)
+
+// At the moment the consumption pricesheet download API is not a)
+// documented or b) supported by the SDK. This is an implementation of
+// a client in the style of the Azure go SDK - once the API is
+// supported this will be removed.
+
+// PriceSheetClient contains the methods for the PriceSheet group.
+// Don't use this type directly, use NewPriceSheetClient() instead.
+type PriceSheetClient struct {
+	host             string
+	billingAccountID string
+	pl               runtime.Pipeline
+}
+
+// NewClient creates a new instance of PriceSheetClient with the specified values.
+// billingAccountId - Azure Billing Account ID.
+// credential - used to authorize requests. Usually a credential from azidentity.
+// options - pass nil to accept the default values.
+func NewClient(billingAccountID string, credential azcore.TokenCredential, options *arm.ClientOptions) (*PriceSheetClient, error) {
+	if options == nil {
+		options = &arm.ClientOptions{}
+	}
+	ep := cloud.AzurePublic.Services[cloud.ResourceManager].Endpoint
+	if c, ok := options.Cloud.Services[cloud.ResourceManager]; ok {
+		ep = c.Endpoint
+	}
+	pl, err := armruntime.NewPipeline(moduleName, moduleVersion, credential, runtime.PipelineOptions{}, options)
+	if err != nil {
+		return nil, err
+	}
+	client := &PriceSheetClient{
+		billingAccountID: billingAccountID,
+		host:             ep,
+		pl:               pl,
+	}
+	return client, nil
+}
+
+// BeginDownloadByBillingPeriod - requests a pricesheet for a specific billing period `yyyymm`.
+// Returns a Poller that will provide the download URL when the pricesheet is ready.
+// If the operation fails it returns an *azcore.ResponseError type.
+// Generated from API version 2022-06-01
+// billingPeriodName - Billing Period Name `yyyymm`.
+func (client *PriceSheetClient) BeginDownloadByBillingPeriod(ctx context.Context, billingPeriodName string) (*runtime.Poller[PriceSheetClientDownloadResponse], error) {
+	resp, err := client.downloadByBillingPeriodOperation(ctx, billingPeriodName)
+	if err != nil {
+		return nil, err
+	}
+	return runtime.NewPoller[PriceSheetClientDownloadResponse](resp, client.pl, nil)
+}
+
+type PriceSheetClientDownloadResponse struct {
+	ID         string                             `json:"id"`
+	Name       string                             `json:"name"`
+	StartTime  time.Time                          `json:"startTime"`
+	EndTime    time.Time                          `json:"endTime"`
+	Status     string                             `json:"status"`
+	Properties PriceSheetClientDownloadProperties `json:"properties"`
+}
+
+type PriceSheetClientDownloadProperties struct {
+	DownloadURL string `json:"downloadUrl"`
+	ValidTill   string `json:"validTill"`
+}
+
+func (client *PriceSheetClient) downloadByBillingPeriodOperation(ctx context.Context, billingPeriodName string) (*http.Response, error) {
+	req, err := client.downloadByBillingPeriodCreateRequest(ctx, billingPeriodName)
+	if err != nil {
+		return nil, err
+	}
+	resp, err := client.pl.Do(req)
+	if err != nil {
+		return nil, err
+	}
+	if !runtime.HasStatusCode(resp, http.StatusOK, http.StatusAccepted) {
+		return nil, runtime.NewResponseError(resp)
+	}
+	return resp, nil
+}
+
+const downloadByBillingPeriodTemplate = "/providers/Microsoft.Billing/billingAccounts/%s/billingPeriods/%s/providers/Microsoft.Consumption/pricesheets/download"
+
+// downloadByBillingPeriodCreateRequest creates the DownloadByBillingPeriod request.
+func (client *PriceSheetClient) downloadByBillingPeriodCreateRequest(ctx context.Context, billingPeriodName string) (*policy.Request, error) {
+	if client.billingAccountID == "" {
+		return nil, errors.New("parameter client.billingAccountID cannot be empty")
+	}
+	if billingPeriodName == "" {
+		return nil, errors.New("parameter billingPeriodName cannot be empty")
+	}
+	urlPath := fmt.Sprintf(downloadByBillingPeriodTemplate, url.PathEscape(client.billingAccountID), url.PathEscape(billingPeriodName))
+	req, err := runtime.NewRequest(ctx, http.MethodGet, runtime.JoinPaths(client.host, urlPath))
+	if err != nil {
+		return nil, err
+	}
+	reqQP := req.Raw().URL.Query()
+	reqQP.Set("api-version", "2022-06-01")
+	reqQP.Set("ln", "en")
+	req.Raw().URL.RawQuery = reqQP.Encode()
+	req.Raw().Header["Accept"] = []string{"*/*"}
+	return req, nil
+}

+ 300 - 0
pkg/cloud/azurepricesheet/downloader.go

@@ -0,0 +1,300 @@
+package azurepricesheet
+
+import (
+	"bufio"
+	"context"
+	"encoding/csv"
+	"fmt"
+	"io"
+	"net/http"
+	"os"
+	"sort"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/Azure/azure-sdk-for-go/profiles/2020-09-01/commerce/mgmt/commerce"
+	"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
+	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
+
+	"github.com/opencost/opencost/pkg/log"
+)
+
+type Downloader[T any] struct {
+	TenantID         string
+	ClientID         string
+	ClientSecret     string
+	BillingAccount   string
+	OfferID          string
+	ConvertMeterInfo func(info commerce.MeterInfo) (map[string]*T, error)
+}
+
+func (d *Downloader[T]) GetPricing(ctx context.Context) (map[string]*T, error) {
+	log.Infof("requesting pricesheet download link")
+	url, err := d.getPricesheetDownloadURL(ctx)
+	if err != nil {
+		return nil, fmt.Errorf("getting download URL: %w", err)
+	}
+	log.Infof("downloading pricesheet from %q", url)
+	data, err := d.saveData(ctx, url, "pricesheet")
+	if err != nil {
+		return nil, fmt.Errorf("saving pricesheet from %q: %w", url, err)
+	}
+	defer data.Close()
+
+	prices, err := d.readPricesheet(ctx, data)
+	if err != nil {
+		return nil, fmt.Errorf("reading pricesheet: %w", err)
+	}
+	log.Infof("loaded %d pricings from pricesheet", len(prices))
+	return prices, nil
+}
+
+func (d *Downloader[T]) getPricesheetDownloadURL(ctx context.Context) (string, error) {
+	cred, err := azidentity.NewClientSecretCredential(d.TenantID, d.ClientID, d.ClientSecret, nil)
+	if err != nil {
+		return "", fmt.Errorf("creating credential: %w", err)
+	}
+	client, err := NewClient(d.BillingAccount, cred, nil)
+	if err != nil {
+		return "", fmt.Errorf("creating pricesheet client: %w", err)
+	}
+	poller, err := client.BeginDownloadByBillingPeriod(ctx, currentBillingPeriod())
+	if err != nil {
+		return "", fmt.Errorf("beginning pricesheet download: %w", err)
+	}
+	resp, err := poller.PollUntilDone(ctx, &runtime.PollUntilDoneOptions{
+		Frequency: 30 * time.Second,
+	})
+	if err != nil {
+		return "", fmt.Errorf("polling for pricesheet: %w", err)
+	}
+	return resp.Properties.DownloadURL, nil
+}
+
+func (d Downloader[T]) saveData(ctx context.Context, url, tempName string) (io.ReadCloser, error) {
+	// Download file from URL in response.
+	out, err := os.CreateTemp("", tempName)
+	if err != nil {
+		return nil, fmt.Errorf("creating %s temp file: %w", tempName, err)
+	}
+
+	resp, err := http.Get(url)
+	if err != nil {
+		return nil, fmt.Errorf("downloading: %w", err)
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("unexpected HTTP status %d", resp.StatusCode)
+	}
+
+	if _, err := io.Copy(out, resp.Body); err != nil {
+		return nil, fmt.Errorf("reading response: %w", err)
+	}
+
+	_, err = out.Seek(0, io.SeekStart)
+	if err != nil {
+		return nil, fmt.Errorf("seeking to start of file: %w", err)
+	}
+
+	return &removeOnClose{File: out}, nil
+}
+
+type removeOnClose struct {
+	*os.File
+}
+
+func (r *removeOnClose) Close() error {
+	err := r.File.Close()
+	if err != nil {
+		return err
+	}
+	return os.Remove(r.Name())
+}
+
+func (d *Downloader[T]) readPricesheet(ctx context.Context, data io.Reader) (map[string]*T, error) {
+	// Avoid double-buffering.
+	buf, ok := (data).(*bufio.Reader)
+	if !ok {
+		buf = bufio.NewReader(data)
+	}
+
+	// The CSV file starts with two lines before the header without
+	// commas (so different numbers of fields as far as the CSV parser
+	// is concerned). Skip them before making the CSV reader so we
+	// still get the benefit of the row length checks after the
+	// header.
+	for i := 0; i < 2; i++ {
+		_, err := buf.ReadBytes('\n')
+		if err != nil {
+			return nil, fmt.Errorf("skipping preamble line %d: %w", i, err)
+		}
+	}
+	reader := csv.NewReader(buf)
+	reader.ReuseRecord = true
+
+	header, err := reader.Read()
+	if err != nil {
+		return nil, fmt.Errorf("reading header: %w", err)
+	}
+	if err := checkPricesheetHeader(header); err != nil {
+		return nil, err
+	}
+
+	units := make(map[string]bool)
+
+	results := make(map[string]*T)
+	lines := 2
+	for {
+		row, err := reader.Read()
+		if err == io.EOF {
+			break
+		}
+		lines++
+		if err != nil {
+			return nil, fmt.Errorf("reading line %d: %w", lines, err)
+		}
+
+		// Skip savings plan - we should be reporting based on the
+		// consumption price because we don't know whether the user is
+		// using a savings plan or over their threshold.
+		if row[pricesheetPriceType] == "Savings Plan" || row[pricesheetOfferID] != d.OfferID {
+			continue
+		}
+
+		// TODO: Creating a meter info for each record will cause a
+		// lot of GC churn - is it worth reusing one meter info instead?
+		meterInfo, err := makeMeterInfo(row)
+		if err != nil {
+			log.Warnf("making meter info (line %d): %v", lines, err)
+			continue
+		}
+
+		pricings, err := d.ConvertMeterInfo(meterInfo)
+		if err != nil {
+			log.Warnf("converting meter to pricings (line %d): %v", lines, err)
+			continue
+		}
+
+		if pricings != nil {
+			units[*meterInfo.Unit] = true
+		}
+
+		for key, pricing := range pricings {
+			results[key] = pricing
+		}
+	}
+
+	if len(results) == 0 {
+		return nil, fmt.Errorf("no matching pricing from price sheet")
+	}
+
+	// Keep track of units seen so we can detect if there are any that
+	// need handling.
+	allUnits := make([]string, 0, len(units))
+	for unit := range units {
+		allUnits = append(allUnits, unit)
+	}
+	sort.Strings(allUnits)
+	log.Infof("all units in pricesheet: %s", strings.Join(allUnits, ", "))
+
+	return results, nil
+}
+
+func checkPricesheetHeader(header []string) error {
+	if len(header) < len(pricesheetCols) {
+		return fmt.Errorf("too few header columns: got %d, expected %d", len(header), len(pricesheetCols))
+	}
+	for col, name := range pricesheetCols {
+		if !strings.EqualFold(header[col], name) {
+			return fmt.Errorf("unexpected header at col %d %q, expected %q", col, header[col], name)
+		}
+	}
+	return nil
+}
+
+func makeMeterInfo(row []string) (commerce.MeterInfo, error) {
+	price, err := strconv.ParseFloat(row[pricesheetUnitPrice], 64)
+	if err != nil {
+		return commerce.MeterInfo{}, fmt.Errorf("parsing unit price: %w", err)
+	}
+	newPrice, unit := normalisePrice(price, row[pricesheetUnit])
+	return commerce.MeterInfo{
+		MeterName:        ptr(row[pricesheetMeterName]),
+		MeterCategory:    ptr(row[pricesheetMeterCategory]),
+		MeterSubCategory: ptr(row[pricesheetMeterSubCategory]),
+		Unit:             &unit,
+		MeterRegion:      ptr(row[pricesheetMeterRegion]),
+		MeterRates:       map[string]*float64{"0": &newPrice},
+	}, nil
+}
+
+var pricesheetCols = []string{
+	"Meter ID",
+	"Meter name",
+	"Meter category",
+	"Meter sub-category",
+	"Meter region",
+	"Unit",
+	"Unit of measure",
+	"Part number",
+	"Unit price",
+	"Currency code",
+	"Included quantity",
+	"Offer Id",
+	"Term",
+	"Price type",
+}
+
+const (
+	pricesheetMeterID          = 0
+	pricesheetMeterName        = 1
+	pricesheetMeterCategory    = 2
+	pricesheetMeterSubCategory = 3
+	pricesheetMeterRegion      = 4
+	pricesheetUnit             = 5
+	pricesheetUnitPrice        = 8
+	pricesheetCurrencyCode     = 9
+	pricesheetOfferID          = 11
+	pricesheetPriceType        = 13
+)
+
+func currentBillingPeriod() string {
+	return time.Now().Format("200601")
+}
+
+func ptr[T any](v T) *T {
+	return &v
+}
+
+// conversions lists all the units seen from the price sheet for
+// prices we're interested in with factors to the corresponding units
+// in the rate card.
+var conversions = map[string]struct {
+	divisor float64
+	unit    string
+}{
+	"1 /Month":       {divisor: 1, unit: "1 /Month"},
+	"1 Hour":         {divisor: 1, unit: "1 Hour"},
+	"1 PiB/Hour":     {divisor: 1_000_000, unit: "1 GiB/Hour"},
+	"10 /Month":      {divisor: 10, unit: "1 /Month"},
+	"10 Hours":       {divisor: 10, unit: "1 Hour"},
+	"100 /Month":     {divisor: 100, unit: "1 /Month"},
+	"100 GB/Month":   {divisor: 100, unit: "1 GB/Month"},
+	"100 Hours":      {divisor: 100, unit: "1 Hour"},
+	"100 TiB/Hour":   {divisor: 100_000, unit: "1 GiB/Hour"},
+	"1000 Hours":     {divisor: 1000, unit: "1 Hour"},
+	"10000 Hours":    {divisor: 10_000, unit: "1 Hour"},
+	"100000 /Hour":   {divisor: 100_000, unit: "1 /Hour"},
+	"1000000 /Hour":  {divisor: 1_000_000, unit: "1 /Hour"},
+	"10000000 /Hour": {divisor: 10_000_000, unit: "1 /Hour"},
+}
+
+func normalisePrice(price float64, unit string) (float64, string) {
+	if conv, ok := conversions[unit]; ok {
+		return price / conv.divisor, conv.unit
+	}
+
+	return price, unit
+}

+ 101 - 0
pkg/cloud/azurepricesheet/downloader_test.go

@@ -0,0 +1,101 @@
+package azurepricesheet
+
+import (
+	"context"
+	"fmt"
+	"strings"
+	"testing"
+
+	"github.com/Azure/azure-sdk-for-go/profiles/2020-09-01/commerce/mgmt/commerce"
+	"github.com/stretchr/testify/require"
+)
+
+func TestDownloader(t *testing.T) {
+	d := Downloader[fakePricing]{
+		TenantID:         "test-tenant-id",
+		ClientID:         "test-client-id",
+		ClientSecret:     "test-client-secret",
+		BillingAccount:   "test-billing-account",
+		OfferID:          "my-offer-id",
+		ConvertMeterInfo: convertMeter,
+	}
+
+	t.Run("read prices", func(t *testing.T) {
+		results, err := d.readPricesheet(context.Background(), strings.NewReader(pricesheetData))
+		require.NoError(t, err)
+
+		// Units and prices are normalised.
+		// Info for saving plans and other offers is skipped.
+		expected := map[string]*fakePricing{
+			"DC96as_v4": {price: "10.505", unit: "1 Hour"},
+			"DC2as_v4":  {price: "0.219", unit: "1 Hour"},
+			"VM1":       {price: "1.0", unit: "1 Hour"},
+			"VM2":       {price: "2.0", unit: "1 Hour"},
+		}
+		require.Equal(t, expected, results)
+	})
+
+	t.Run("bad header", func(t *testing.T) {
+		data := "\n\nMeter ID,Meter name,Meter category,Something else,,,,,,,,,,,,,,\n"
+		_, err := d.readPricesheet(context.Background(), strings.NewReader(data))
+		require.ErrorContains(t, err, `unexpected header at col 3 "Something else", expected "Meter sub-category"`)
+	})
+
+	t.Run("short header", func(t *testing.T) {
+		data := "\n\nMeter ID, Meter name, Meter category, Meter sub-category\n"
+		_, err := d.readPricesheet(context.Background(), strings.NewReader(data))
+		require.ErrorContains(t, err, "too few header columns: got 4, expected 14")
+	})
+
+	t.Run("no matching prices", func(t *testing.T) {
+		d := Downloader[fakePricing]{
+			TenantID:       "test-tenant-id",
+			ClientID:       "test-client-id",
+			ClientSecret:   "test-client-secret",
+			BillingAccount: "test-billing-account",
+			OfferID:        "my-offer-id",
+			ConvertMeterInfo: func(commerce.MeterInfo) (map[string]*fakePricing, error) {
+				return nil, nil
+			},
+		}
+		_, err := d.readPricesheet(context.Background(), strings.NewReader(pricesheetData))
+		require.ErrorContains(t, err, "no matching pricing from price sheet")
+	})
+}
+
+func convertMeter(info commerce.MeterInfo) (map[string]*fakePricing, error) {
+	switch *info.MeterName {
+	case "skip-this":
+		return nil, nil
+	case "multiple-prices":
+		return map[string]*fakePricing{
+			"VM1": {price: "1.0", unit: "1 Hour"},
+			"VM2": {price: "2.0", unit: "1 Hour"},
+		}, nil
+	case "error":
+		return nil, fmt.Errorf("there was an error handling this row!")
+	default:
+		return map[string]*fakePricing{
+			*info.MeterName: {price: fmt.Sprintf("%0.3f", *info.MeterRates["0"]), unit: *info.Unit},
+		}, nil
+	}
+}
+
+type fakePricing struct {
+	price string
+	unit  string
+}
+
+const pricesheetData = `Price Sheet Report for billing period - 202304
+
+Meter ID,Meter name,Meter category,Meter sub-category,Meter region,Unit,Unit of measure,Part number,Unit price,Currency code,Included quantity,Offer Id,Term,Price type
+d4236f8f-3ba6-5a9a-8c6b-14556538c44c,DC96as_v4,Virtual Machines,DCasv4 Series,US East,10 Hours,10 Hours,AAF-70822,105.050000000000000,USD,0.00,my-offer-id,,Consumption
+d4236f8f-3ba6-5a9a-8c6b-14556538c44c,DC96as_v4,Virtual Machines,DCasv4 Series,US East,10 Hours,10 Hours,AAF-70831,60.890000000000000,USD,0.00,other-offer-id,,Consumption
+e47a2c4c-4dc4-55d5-a8d7-ec5b1dcc9c08,DC2as_v4,Virtual Machines,DCasv4 Series,US East,100 Hours,100 Hours,AAF-70890,21.900000000000000,USD,0.000,my-offer-id,,Consumption
+e47a2c4c-4dc4-55d5-a8d7-ec5b1dcc9c08,DC2as_v4,Virtual Machines,DCasv4 Series,US East,100 Hours,100 Hours,AAF-70886,12.700000000000000,USD,0.000,other-offer-id,,Consumption
+cb8d72c0-2b02-5b41-9ac9-2809c04f17ff,DC16as_v4,Virtual Machines,DCasv4 Series,US East,10 Hours,10 Hours,AAF-70911,17.510000000000000,USD,0.00,my-offer-id,,Savings Plan
+cb8d72c0-2b02-5b41-9ac9-2809c04f17ff,DC16as_v4,Virtual Machines,DCasv4 Series,US East,10 Hours,10 Hours,AAF-70910,10.150000000000000,USD,0.00,other-offer-id,,Consumption
+d4236f8f-3ba6-5a9a-8c6b-14556538c44c,skip-this,Virtual Machines,DCasv4 Series,US East,10 Hours,10 Hours,AAF-70822,105.050000000000000,USD,0.00,my-offer-id,,Consumption
+d4236f8f-3ba6-5a9a-8c6b-14556538c44c,multiple-prices,Virtual Machines,DCasv4 Series,US East,10 Hours,10 Hours,AAF-70822,105.050000000000000,USD,0.00,my-offer-id,,Consumption
+d4236f8f-3ba6-5a9a-8c6b-14556538c44c,error,Virtual Machines,DCasv4 Series,US East,10 Hours,10 Hours,AAF-70822,105.050000000000000,USD,0.00,my-offer-id,,Consumption
+`

+ 199 - 104
pkg/cloud/azureprovider.go

@@ -13,8 +13,6 @@ import (
 	"sync"
 	"time"
 
-	"github.com/opencost/opencost/pkg/kubecost"
-
 	"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2021-11-01/compute"
 	"github.com/Azure/azure-sdk-for-go/services/preview/commerce/mgmt/2015-06-01-preview/commerce"
 	"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2016-06-01/subscriptions"
@@ -22,13 +20,17 @@ import (
 	"github.com/Azure/go-autorest/autorest"
 	"github.com/Azure/go-autorest/autorest/azure"
 	"github.com/Azure/go-autorest/autorest/azure/auth"
+
+	pricesheet "github.com/opencost/opencost/pkg/cloud/azurepricesheet"
 	"github.com/opencost/opencost/pkg/clustercache"
 	"github.com/opencost/opencost/pkg/env"
+	"github.com/opencost/opencost/pkg/kubecost"
 	"github.com/opencost/opencost/pkg/log"
 	"github.com/opencost/opencost/pkg/util"
 	"github.com/opencost/opencost/pkg/util/fileutil"
 	"github.com/opencost/opencost/pkg/util/json"
 	"github.com/opencost/opencost/pkg/util/timeutil"
+
 	v1 "k8s.io/api/core/v1"
 )
 
@@ -205,7 +207,7 @@ func getRegions(service string, subscriptionsClient subscriptions.Client, provid
 						if loc, ok := allLocations[displName]; ok {
 							supLocations[loc] = displName
 						} else {
-							log.Warnf("unsupported cloud region %s", loc)
+							log.Warnf("unsupported cloud region %q", loc)
 						}
 					}
 					break
@@ -223,7 +225,7 @@ func getRegions(service string, subscriptionsClient subscriptions.Client, provid
 						if loc, ok := allLocations[displName]; ok {
 							supLocations[loc] = displName
 						} else {
-							log.Warnf("unsupported cloud region %s", loc)
+							log.Warnf("unsupported cloud region %q", loc)
 						}
 					}
 					break
@@ -328,7 +330,7 @@ func toRegionID(meterRegion string, regions map[string]string) (string, error) {
 			return regionID, nil
 		}
 	}
-	return "", fmt.Errorf("Couldn't find region")
+	return "", fmt.Errorf("Couldn't find region %q", meterRegion)
 }
 
 // azure has very inconsistent naming standards between display names from the rate card api and display names from the regions api
@@ -395,7 +397,9 @@ type Azure struct {
 	Clientset                      clustercache.ClusterCache
 	Config                         *ProviderConfig
 	serviceAccountChecks           *ServiceAccountChecks
-	RateCardPricingError           error
+	pricingSource                  string
+	rateCardPricingError           error
+	priceSheetPricingError         error
 	clusterAccountID               string
 	clusterRegion                  string
 	loadedAzureSecret              bool
@@ -779,10 +783,19 @@ func (az *Azure) DownloadPricingData() error {
 
 	config, err := az.GetConfig()
 	if err != nil {
-		az.RateCardPricingError = err
+		az.rateCardPricingError = err
 		return err
 	}
 
+	envBillingAccount := env.GetAzureBillingAccount()
+	if envBillingAccount != "" {
+		config.AzureBillingAccount = envBillingAccount
+	}
+	envOfferID := env.GetAzureOfferID()
+	if envOfferID != "" {
+		config.AzureOfferDurableID = envOfferID
+	}
+
 	// Load the service provider keys
 	subscriptionID, clientID, clientSecret, tenantID := az.getAzureRateCardAuth(false, config)
 	config.AzureSubscriptionID = subscriptionID
@@ -798,7 +811,7 @@ func (az *Azure) DownloadPricingData() error {
 		credentialsConfig := NewClientCredentialsConfig(config.AzureClientID, config.AzureClientSecret, config.AzureTenantID, azureEnv)
 		a, err := credentialsConfig.Authorizer()
 		if err != nil {
-			az.RateCardPricingError = err
+			az.rateCardPricingError = err
 			return err
 		}
 		authorizer = a
@@ -810,7 +823,7 @@ func (az *Azure) DownloadPricingData() error {
 		if err != nil {
 			a, err := auth.NewAuthorizerFromFile(azureEnv.ResourceManagerEndpoint)
 			if err != nil {
-				az.RateCardPricingError = err
+				az.rateCardPricingError = err
 				return err
 			}
 			authorizer = a
@@ -832,14 +845,14 @@ func (az *Azure) DownloadPricingData() error {
 	result, err := rcClient.Get(context.TODO(), rateCardFilter)
 	if err != nil {
 		log.Warnf("Error in pricing download query from API")
-		az.RateCardPricingError = err
+		az.rateCardPricingError = err
 		return err
 	}
 
 	regions, err := getRegions("compute", sClient, providersClient, config.AzureSubscriptionID)
 	if err != nil {
 		log.Warnf("Error in pricing download regions from API")
-		az.RateCardPricingError = err
+		az.rateCardPricingError = err
 		return err
 	}
 
@@ -847,107 +860,166 @@ func (az *Azure) DownloadPricingData() error {
 	allPrices := make(map[string]*AzurePricing)
 
 	for _, v := range *result.Meters {
-		meterName := *v.MeterName
-		meterRegion := *v.MeterRegion
-		meterCategory := *v.MeterCategory
-		meterSubCategory := *v.MeterSubCategory
-
-		region, err := toRegionID(meterRegion, regions)
+		pricings, err := convertMeterToPricings(v, regions, baseCPUPrice)
 		if err != nil {
+			log.Warnf("converting meter to pricings: %s", err.Error())
 			continue
 		}
+		for key, pricing := range pricings {
+			allPrices[key] = pricing
+		}
+	}
+	addAzureFilePricing(allPrices, regions)
 
-		if !strings.Contains(meterSubCategory, "Windows") {
-
-			if strings.Contains(meterCategory, "Storage") {
-				if strings.Contains(meterSubCategory, "HDD") || strings.Contains(meterSubCategory, "SSD") || strings.Contains(meterSubCategory, "Premium Files") {
-					var storageClass string = ""
-					if strings.Contains(meterName, "P4 ") {
-						storageClass = AzureDiskPremiumSSDStorageClass
-					} else if strings.Contains(meterName, "E4 ") {
-						storageClass = AzureDiskStandardSSDStorageClass
-					} else if strings.Contains(meterName, "S4 ") {
-						storageClass = AzureDiskStandardStorageClass
-					} else if strings.Contains(meterName, "LRS Provisioned") {
-						storageClass = AzureFilePremiumStorageClass
-					}
-
-					if storageClass != "" {
-						var priceInUsd float64
-
-						if len(v.MeterRates) < 1 {
-							log.Warnf("missing rate info %+v", map[string]interface{}{"MeterSubCategory": *v.MeterSubCategory, "region": region})
-							continue
-						}
-						for _, rate := range v.MeterRates {
-							priceInUsd += *rate
-						}
-						// rate is in disk per month, resolve price per hour, then GB per hour
-						pricePerHour := priceInUsd / 730.0 / 32.0
-						priceStr := fmt.Sprintf("%f", pricePerHour)
-
-						key := region + "," + storageClass
-						log.Debugf("Adding PV.Key: %s, Cost: %s", key, priceStr)
-						allPrices[key] = &AzurePricing{
-							PV: &PV{
-								Cost:   priceStr,
-								Region: region,
-							},
-						}
-					}
-				}
+	az.Pricing = allPrices
+	az.pricingSource = rateCardPricingSource
+	az.rateCardPricingError = nil
+
+	// If we've got a billing account set, kick off downloading the custom pricing data.
+	if config.AzureBillingAccount != "" {
+		downloader := pricesheet.Downloader[AzurePricing]{
+			TenantID:       config.AzureTenantID,
+			ClientID:       config.AzureClientID,
+			ClientSecret:   config.AzureClientSecret,
+			BillingAccount: config.AzureBillingAccount,
+			OfferID:        config.AzureOfferDurableID,
+			ConvertMeterInfo: func(meterInfo commerce.MeterInfo) (map[string]*AzurePricing, error) {
+				return convertMeterToPricings(meterInfo, regions, baseCPUPrice)
+			},
+		}
+		// The price sheet can take 5 minutes to generate, so we don't
+		// want to hang onto the lock while we're waiting for it.
+		go func() {
+			ctx := context.Background()
+			allPrices, err := downloader.GetPricing(ctx)
+
+			az.DownloadPricingDataLock.Lock()
+			defer az.DownloadPricingDataLock.Unlock()
+			if err != nil {
+				log.Errorf("Error downloading Azure price sheet: %s", err)
+				az.priceSheetPricingError = err
+				return
 			}
+			addAzureFilePricing(allPrices, regions)
+			az.Pricing = allPrices
+			az.pricingSource = priceSheetPricingSource
+			az.priceSheetPricingError = nil
+		}()
+	}
 
-			if strings.Contains(meterCategory, "Virtual Machines") {
-
-				usageType := ""
-				if !strings.Contains(meterName, "Low Priority") {
-					usageType = "ondemand"
-				} else {
-					usageType = "preemptible"
-				}
+	return nil
+}
 
-				var instanceTypes []string
-				name := strings.TrimSuffix(meterName, " Low Priority")
-				instanceType := strings.Split(name, "/")
-				for _, it := range instanceType {
-					if strings.Contains(meterSubCategory, "Promo") {
-						it = it + " Promo"
-					}
-					instanceTypes = append(instanceTypes, strings.Replace(it, " ", "_", 1))
-				}
+func convertMeterToPricings(info commerce.MeterInfo, regions map[string]string, baseCPUPrice string) (map[string]*AzurePricing, error) {
+	meterName := *info.MeterName
+	meterRegion := *info.MeterRegion
+	meterCategory := *info.MeterCategory
+	meterSubCategory := *info.MeterSubCategory
 
-				instanceTypes = transformMachineType(meterSubCategory, instanceTypes)
-				if strings.Contains(name, "Expired") {
-					instanceTypes = []string{}
-				}
+	region, err := toRegionID(meterRegion, regions)
+	if err != nil {
+		// Skip this meter if we don't recognize the region.
+		return nil, nil
+	}
+
+	if strings.Contains(meterSubCategory, "Windows") {
+		// This meter doesn't correspond to any pricings.
+		return nil, nil
+	}
+
+	if strings.Contains(meterCategory, "Storage") {
+		if strings.Contains(meterSubCategory, "HDD") || strings.Contains(meterSubCategory, "SSD") || strings.Contains(meterSubCategory, "Premium Files") {
+			var storageClass string = ""
+			if strings.Contains(meterName, "P4 ") {
+				storageClass = AzureDiskPremiumSSDStorageClass
+			} else if strings.Contains(meterName, "E4 ") {
+				storageClass = AzureDiskStandardSSDStorageClass
+			} else if strings.Contains(meterName, "S4 ") {
+				storageClass = AzureDiskStandardStorageClass
+			} else if strings.Contains(meterName, "LRS Provisioned") {
+				storageClass = AzureFilePremiumStorageClass
+			}
 
+			if storageClass != "" {
 				var priceInUsd float64
 
-				if len(v.MeterRates) < 1 {
-					log.Warnf("missing rate info %+v", map[string]interface{}{"MeterSubCategory": *v.MeterSubCategory, "region": region})
-					continue
+				if len(info.MeterRates) < 1 {
+					return nil, fmt.Errorf("missing rate info %+v", map[string]interface{}{"MeterSubCategory": *info.MeterSubCategory, "region": region})
 				}
-				for _, rate := range v.MeterRates {
+				for _, rate := range info.MeterRates {
 					priceInUsd += *rate
 				}
-				priceStr := fmt.Sprintf("%f", priceInUsd)
-				for _, instanceType := range instanceTypes {
-
-					key := fmt.Sprintf("%s,%s,%s", region, instanceType, usageType)
-
-					allPrices[key] = &AzurePricing{
-						Node: &Node{
-							Cost:         priceStr,
-							BaseCPUPrice: baseCPUPrice,
-							UsageType:    usageType,
+				// rate is in disk per month, resolve price per hour, then GB per hour
+				pricePerHour := priceInUsd / 730.0 / 32.0
+				priceStr := fmt.Sprintf("%f", pricePerHour)
+
+				key := region + "," + storageClass
+				log.Debugf("Adding PV.Key: %s, Cost: %s", key, priceStr)
+				return map[string]*AzurePricing{
+					key: {
+						PV: &PV{
+							Cost:   priceStr,
+							Region: region,
 						},
-					}
-				}
+					},
+				}, nil
 			}
 		}
 	}
 
+	if !strings.Contains(meterCategory, "Virtual Machines") {
+		return nil, nil
+	}
+
+	usageType := ""
+	if !strings.Contains(meterName, "Low Priority") {
+		usageType = "ondemand"
+	} else {
+		usageType = "preemptible"
+	}
+
+	var instanceTypes []string
+	name := strings.TrimSuffix(meterName, " Low Priority")
+	instanceType := strings.Split(name, "/")
+	for _, it := range instanceType {
+		if strings.Contains(meterSubCategory, "Promo") {
+			it = it + " Promo"
+		}
+		instanceTypes = append(instanceTypes, strings.Replace(it, " ", "_", 1))
+	}
+
+	instanceTypes = transformMachineType(meterSubCategory, instanceTypes)
+	if strings.Contains(name, "Expired") {
+		instanceTypes = []string{}
+	}
+
+	var priceInUsd float64
+
+	if len(info.MeterRates) < 1 {
+		return nil, fmt.Errorf("missing rate info %+v", map[string]interface{}{"MeterSubCategory": *info.MeterSubCategory, "region": region})
+	}
+	for _, rate := range info.MeterRates {
+		priceInUsd += *rate
+	}
+	priceStr := fmt.Sprintf("%f", priceInUsd)
+	results := make(map[string]*AzurePricing)
+	for _, instanceType := range instanceTypes {
+
+		key := fmt.Sprintf("%s,%s,%s", region, instanceType, usageType)
+		pricing := &AzurePricing{
+			Node: &Node{
+				Cost:         priceStr,
+				BaseCPUPrice: baseCPUPrice,
+				UsageType:    usageType,
+			},
+		}
+		results[key] = pricing
+	}
+	return results, nil
+
+}
+
+func addAzureFilePricing(prices map[string]*AzurePricing, regions map[string]string) {
 	// There is no easy way of supporting Standard Azure-File, because it's billed per used GB
 	// this will set the price to "0" as a workaround to not spam with `Persistent Volume pricing not found for` error
 	// check https://github.com/opencost/opencost/issues/159 for more information (same problem on AWS)
@@ -955,17 +1027,13 @@ func (az *Azure) DownloadPricingData() error {
 	for region := range regions {
 		key := region + "," + AzureFileStandardStorageClass
 		log.Debugf("Adding PV.Key: %s, Cost: %s", key, zeroPrice)
-		allPrices[key] = &AzurePricing{
+		prices[key] = &AzurePricing{
 			PV: &PV{
 				Cost:   zeroPrice,
 				Region: region,
 			},
 		}
 	}
-
-	az.Pricing = allPrices
-	az.RateCardPricingError = nil
-	return nil
 }
 
 // determineCloudByRegion uses region name to pick the correct Cloud Environment for the azure provider to use
@@ -1205,7 +1273,7 @@ func (az *Azure) getDisks() ([]*compute.Disk, error) {
 		credentialsConfig := NewClientCredentialsConfig(config.AzureClientID, config.AzureClientSecret, config.AzureTenantID, azureEnv)
 		a, err := credentialsConfig.Authorizer()
 		if err != nil {
-			az.RateCardPricingError = err
+			az.rateCardPricingError = err
 			return nil, err
 		}
 		authorizer = a
@@ -1217,7 +1285,7 @@ func (az *Azure) getDisks() ([]*compute.Disk, error) {
 		if err != nil {
 			a, err := auth.NewAuthorizerFromFile(azureEnv.ResourceManagerEndpoint)
 			if err != nil {
-				az.RateCardPricingError = err
+				az.rateCardPricingError = err
 				return nil, err
 			}
 			authorizer = a
@@ -1472,18 +1540,23 @@ func (az *Azure) ServiceAccountStatus() *ServiceAccountStatus {
 	return az.serviceAccountChecks.getStatus()
 }
 
-const rateCardPricingSource = "Rate Card API"
+const (
+	rateCardPricingSource   = "Rate Card API"
+	priceSheetPricingSource = "Price Sheet API"
+)
 
 // PricingSourceStatus returns the status of the rate card api
 func (az *Azure) PricingSourceStatus() map[string]*PricingSource {
+	az.DownloadPricingDataLock.Lock()
+	defer az.DownloadPricingDataLock.Unlock()
 	sources := make(map[string]*PricingSource)
 	errMsg := ""
-	if az.RateCardPricingError != nil {
-		errMsg = az.RateCardPricingError.Error()
+	if az.rateCardPricingError != nil {
+		errMsg = az.rateCardPricingError.Error()
 	}
 	rcps := &PricingSource{
 		Name:    rateCardPricingSource,
-		Enabled: true,
+		Enabled: az.pricingSource == rateCardPricingSource,
 		Error:   errMsg,
 	}
 	if rcps.Error != "" {
@@ -1494,7 +1567,29 @@ func (az *Azure) PricingSourceStatus() map[string]*PricingSource {
 	} else {
 		rcps.Available = true
 	}
+
+	errMsg = ""
+	if az.priceSheetPricingError != nil {
+		errMsg = az.priceSheetPricingError.Error()
+	}
+	psps := &PricingSource{
+		Name:    priceSheetPricingSource,
+		Enabled: az.pricingSource == priceSheetPricingSource,
+		Error:   errMsg,
+	}
+	if psps.Error != "" {
+		psps.Available = false
+	} else if len(az.Pricing) == 0 {
+		psps.Error = "No Pricing Data Available"
+		psps.Available = false
+	} else if env.GetAzureBillingAccount() == "" {
+		psps.Error = "No Azure Billing Account ID"
+		psps.Available = false
+	} else {
+		psps.Available = true
+	}
 	sources[rateCardPricingSource] = rcps
+	sources[priceSheetPricingSource] = psps
 	return sources
 }
 

+ 59 - 0
pkg/cloud/azureprovider_test.go

@@ -2,6 +2,9 @@ package cloud
 
 import (
 	"testing"
+
+	"github.com/Azure/azure-sdk-for-go/services/preview/commerce/mgmt/2015-06-01-preview/commerce"
+	"github.com/stretchr/testify/require"
 )
 
 func TestParseAzureSubscriptionID(t *testing.T) {
@@ -34,3 +37,59 @@ func TestParseAzureSubscriptionID(t *testing.T) {
 		}
 	}
 }
+
+func TestConvertMeterToPricings(t *testing.T) {
+	regions := map[string]string{
+		"useast":             "US East",
+		"japanwest":          "Japan West",
+		"australiasoutheast": "Australia Southeast",
+		"norwaywest":         "Norway West",
+	}
+	baseCPUPrice := "0.30000"
+
+	meterInfo := func(category, subcategory, name, region string, rate float64) commerce.MeterInfo {
+		return commerce.MeterInfo{
+			MeterCategory:    &category,
+			MeterSubCategory: &subcategory,
+			MeterName:        &name,
+			MeterRegion:      &region,
+			MeterRates:       map[string]*float64{"0": &rate},
+		}
+	}
+
+	t.Run("windows", func(t *testing.T) {
+		info := meterInfo("Virtual Machines", "D2 Series Windows", "D2s v3", "AU Southeast", 0.3)
+		results, err := convertMeterToPricings(info, regions, baseCPUPrice)
+		require.NoError(t, err)
+		require.Nil(t, results)
+	})
+
+	t.Run("storage", func(t *testing.T) {
+		info := meterInfo("Storage", "Some SSD type", "P4 are good", "US East", 2000)
+		results, err := convertMeterToPricings(info, regions, baseCPUPrice)
+		require.NoError(t, err)
+
+		expected := map[string]*AzurePricing{
+			"useast,premium_ssd": {
+				PV: &PV{Cost: "0.085616", Region: "useast"},
+			},
+		}
+		require.Equal(t, expected, results)
+	})
+
+	t.Run("virtual machines", func(t *testing.T) {
+		info := meterInfo("Virtual Machines", "Eav4/Easv4 Series", "E96a v4/E96as v4 Low Priority", "JA West", 10)
+		results, err := convertMeterToPricings(info, regions, baseCPUPrice)
+		require.NoError(t, err)
+
+		expected := map[string]*AzurePricing{
+			"japanwest,Standard_E96a_v4,preemptible": {
+				Node: &Node{Cost: "10.000000", BaseCPUPrice: "0.30000", UsageType: "preemptible"},
+			},
+			"japanwest,Standard_E96as_v4,preemptible": {
+				Node: &Node{Cost: "10.000000", BaseCPUPrice: "0.30000", UsageType: "preemptible"},
+			},
+		}
+		require.Equal(t, expected, results)
+	})
+}

+ 15 - 5
pkg/cloud/provider.go

@@ -4,9 +4,8 @@ import (
 	"database/sql"
 	"errors"
 	"fmt"
-	"golang.org/x/text/cases"
-	"golang.org/x/text/language"
 	"io"
+	"net"
 	"net/http"
 	"regexp"
 	"strconv"
@@ -14,6 +13,9 @@ import (
 	"sync"
 	"time"
 
+	"golang.org/x/text/cases"
+	"golang.org/x/text/language"
+
 	"github.com/opencost/opencost/pkg/kubecost"
 
 	"github.com/opencost/opencost/pkg/util"
@@ -209,6 +211,7 @@ type CustomPricing struct {
 	AzureClientSecret            string `json:"azureClientSecret"`
 	AzureTenantID                string `json:"azureTenantID"`
 	AzureBillingRegion           string `json:"azureBillingRegion"`
+	AzureBillingAccount          string `json:"azureBillingAccount"`
 	AzureOfferDurableID          string `json:"azureOfferDurableID"`
 	AzureStorageSubscriptionID   string `json:"azureStorageSubscriptionID"`
 	AzureStorageAccount          string `json:"azureStorageAccount"`
@@ -506,9 +509,16 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string, config *config.
 			clusterRegion:    cp.region,
 			clusterAccountID: cp.accountID,
 			clusterProjectID: cp.projectID,
-			metadataClient: metadata.NewClient(&http.Client{
-				Transport: httputil.NewUserAgentTransport("kubecost", http.DefaultTransport),
-			}),
+			metadataClient: metadata.NewClient(
+				&http.Client{
+					Transport: httputil.NewUserAgentTransport("kubecost", &http.Transport{
+						Dial: (&net.Dialer{
+							Timeout:   2 * time.Second,
+							KeepAlive: 30 * time.Second,
+						}).Dial,
+					}),
+					Timeout: 5 * time.Second,
+				}),
 		}, nil
 	case kubecost.AWSProvider:
 		log.Info("Found ProviderID starting with \"aws\", using AWS Provider")

+ 4 - 1
pkg/costmodel/aggregation.go

@@ -2250,7 +2250,10 @@ func (a *Accesses) ComputeAllocationHandler(w http.ResponseWriter, r *http.Reque
 	// IncludeProportionalAssetResourceCosts, if true,
 	includeProportionalAssetResourceCosts := qp.GetBool("includeProportionalAssetResourceCosts", false)
 
-	asr, err := a.Model.QueryAllocation(window, resolution, step, aggregateBy, includeIdle, idleByNode, includeProportionalAssetResourceCosts)
+	// include aggregated labels/annotations if true
+	includeAggregatedMetadata := qp.GetBool("includeAggregatedMetadata", true)
+
+	asr, err := a.Model.QueryAllocation(window, resolution, step, aggregateBy, includeIdle, idleByNode, includeProportionalAssetResourceCosts, includeAggregatedMetadata)
 	if err != nil {
 		if strings.Contains(strings.ToLower(err.Error()), "bad request") {
 			WriteError(w, BadRequest(err.Error()))

+ 51 - 47
pkg/costmodel/allocation.go

@@ -13,52 +13,52 @@ import (
 )
 
 const (
-	queryFmtPods                     = `avg(kube_pod_container_status_running{}) by (pod, namespace, %s)[%s:%s]`
-	queryFmtPodsUID                  = `avg(kube_pod_container_status_running{}) by (pod, namespace, uid, %s)[%s:%s]`
-	queryFmtRAMBytesAllocated        = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s, provider_id)`
-	queryFmtRAMRequests              = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
-	queryFmtRAMUsageAvg              = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
-	queryFmtRAMUsageMax              = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
-	queryFmtCPUCoresAllocated        = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
-	queryFmtCPURequests              = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
-	queryFmtCPUUsageAvg              = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
-	queryFmtGPUsRequested            = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
-	queryFmtGPUsAllocated            = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
-	queryFmtNodeCostPerCPUHr         = `avg(avg_over_time(node_cpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
-	queryFmtNodeCostPerRAMGiBHr      = `avg(avg_over_time(node_ram_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
-	queryFmtNodeCostPerGPUHr         = `avg(avg_over_time(node_gpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
-	queryFmtNodeIsSpot               = `avg_over_time(kubecost_node_is_spot[%s])`
-	queryFmtPVCInfo                  = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]`
-	queryFmtPodPVCAllocation         = `avg(avg_over_time(pod_pvc_allocation[%s])) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
-	queryFmtPVCBytesRequested        = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s])) by (persistentvolumeclaim, namespace, %s)`
-	queryFmtPVActiveMins             = `count(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%s]`
-	queryFmtPVBytes                  = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (persistentvolume, %s)`
-	queryFmtPVCostPerGiBHour         = `avg(avg_over_time(pv_hourly_cost[%s])) by (volumename, %s)`
-	queryFmtNetZoneGiB               = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
-	queryFmtNetZoneCostPerGiB        = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s])) by (%s)`
-	queryFmtNetRegionGiB             = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
-	queryFmtNetRegionCostPerGiB      = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s])) by (%s)`
-	queryFmtNetInternetGiB           = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
-	queryFmtNetInternetCostPerGiB    = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s])) by (%s)`
-	queryFmtNetReceiveBytes          = `sum(increase(container_network_receive_bytes_total{pod!=""}[%s])) by (pod_name, pod, namespace, %s)`
-	queryFmtNetTransferBytes         = `sum(increase(container_network_transmit_bytes_total{pod!=""}[%s])) by (pod_name, pod, namespace, %s)`
-	queryFmtNodeLabels               = `avg_over_time(kube_node_labels[%s])`
-	queryFmtNamespaceLabels          = `avg_over_time(kube_namespace_labels[%s])`
-	queryFmtNamespaceAnnotations     = `avg_over_time(kube_namespace_annotations[%s])`
-	queryFmtPodLabels                = `avg_over_time(kube_pod_labels[%s])`
-	queryFmtPodAnnotations           = `avg_over_time(kube_pod_annotations[%s])`
-	queryFmtServiceLabels            = `avg_over_time(service_selector_labels[%s])`
-	queryFmtDeploymentLabels         = `avg_over_time(deployment_match_labels[%s])`
-	queryFmtStatefulSetLabels        = `avg_over_time(statefulSet_match_labels[%s])`
-	queryFmtDaemonSetLabels          = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s])) by (pod, owner_name, namespace, %s)`
-	queryFmtJobLabels                = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s])) by (pod, owner_name, namespace ,%s)`
-	queryFmtPodsWithReplicaSetOwner  = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet"}[%s])) by (pod, owner_name, namespace ,%s)`
-	queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>"}[%s])) by (replicaset, namespace, %s)`
-	queryFmtLBCostPerHr              = `avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s)`
-	queryFmtLBActiveMins             = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]`
-	queryFmtOldestSample             = `min_over_time(timestamp(group(node_cpu_hourly_cost))[%s:%s])`
-	queryFmtNewestSample             = `max_over_time(timestamp(group(node_cpu_hourly_cost))[%s:%s])`
-
+	queryFmtPods                        = `avg(kube_pod_container_status_running{}) by (pod, namespace, %s)[%s:%s]`
+	queryFmtPodsUID                     = `avg(kube_pod_container_status_running{}) by (pod, namespace, uid, %s)[%s:%s]`
+	queryFmtRAMBytesAllocated           = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s, provider_id)`
+	queryFmtRAMRequests                 = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
+	queryFmtRAMUsageAvg                 = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
+	queryFmtRAMUsageMax                 = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
+	queryFmtCPUCoresAllocated           = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
+	queryFmtCPURequests                 = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
+	queryFmtCPUUsageAvg                 = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
+	queryFmtGPUsRequested               = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
+	queryFmtGPUsAllocated               = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
+	queryFmtNodeCostPerCPUHr            = `avg(avg_over_time(node_cpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
+	queryFmtNodeCostPerRAMGiBHr         = `avg(avg_over_time(node_ram_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
+	queryFmtNodeCostPerGPUHr            = `avg(avg_over_time(node_gpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
+	queryFmtNodeIsSpot                  = `avg_over_time(kubecost_node_is_spot[%s])`
+	queryFmtPVCInfo                     = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]`
+	queryFmtPodPVCAllocation            = `avg(avg_over_time(pod_pvc_allocation[%s])) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
+	queryFmtPVCBytesRequested           = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s])) by (persistentvolumeclaim, namespace, %s)`
+	queryFmtPVActiveMins                = `count(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%s]`
+	queryFmtPVBytes                     = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (persistentvolume, %s)`
+	queryFmtPVCostPerGiBHour            = `avg(avg_over_time(pv_hourly_cost[%s])) by (volumename, %s)`
+	queryFmtNetZoneGiB                  = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	queryFmtNetZoneCostPerGiB           = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s])) by (%s)`
+	queryFmtNetRegionGiB                = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	queryFmtNetRegionCostPerGiB         = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s])) by (%s)`
+	queryFmtNetInternetGiB              = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	queryFmtNetInternetCostPerGiB       = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s])) by (%s)`
+	queryFmtNetReceiveBytes             = `sum(increase(container_network_receive_bytes_total{pod!=""}[%s])) by (pod_name, pod, namespace, %s)`
+	queryFmtNetTransferBytes            = `sum(increase(container_network_transmit_bytes_total{pod!=""}[%s])) by (pod_name, pod, namespace, %s)`
+	queryFmtNodeLabels                  = `avg_over_time(kube_node_labels[%s])`
+	queryFmtNamespaceLabels             = `avg_over_time(kube_namespace_labels[%s])`
+	queryFmtNamespaceAnnotations        = `avg_over_time(kube_namespace_annotations[%s])`
+	queryFmtPodLabels                   = `avg_over_time(kube_pod_labels[%s])`
+	queryFmtPodAnnotations              = `avg_over_time(kube_pod_annotations[%s])`
+	queryFmtServiceLabels               = `avg_over_time(service_selector_labels[%s])`
+	queryFmtDeploymentLabels            = `avg_over_time(deployment_match_labels[%s])`
+	queryFmtStatefulSetLabels           = `avg_over_time(statefulSet_match_labels[%s])`
+	queryFmtDaemonSetLabels             = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s])) by (pod, owner_name, namespace, %s)`
+	queryFmtJobLabels                   = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s])) by (pod, owner_name, namespace ,%s)`
+	queryFmtPodsWithReplicaSetOwner     = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet"}[%s])) by (pod, owner_name, namespace ,%s)`
+	queryFmtReplicaSetsWithoutOwners    = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>"}[%s])) by (replicaset, namespace, %s)`
+	queryFmtReplicaSetsWithRolloutOwner = `avg(avg_over_time(kube_replicaset_owner{owner_kind="Rollout"}[%s])) by (replicaset, namespace, owner_kind, owner_name, %s)`
+	queryFmtLBCostPerHr                 = `avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s)`
+	queryFmtLBActiveMins                = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]`
+	queryFmtOldestSample                = `min_over_time(timestamp(group(node_cpu_hourly_cost))[%s:%s])`
+	queryFmtNewestSample                = `max_over_time(timestamp(group(node_cpu_hourly_cost))[%s:%s])`
 
 	// Because we use container_cpu_usage_seconds_total to calculate CPU usage
 	// at any given "instant" of time, we need to use an irate or rate. To then
@@ -504,6 +504,9 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, durStr, env.GetPromClusterLabel())
 	resChReplicaSetsWithoutOwners := ctx.QueryAtTime(queryReplicaSetsWithoutOwners, end)
 
+	queryReplicaSetsWithRolloutOwner := fmt.Sprintf(queryFmtReplicaSetsWithRolloutOwner, durStr, env.GetPromClusterLabel())
+	resChReplicaSetsWithRolloutOwner := ctx.QueryAtTime(queryReplicaSetsWithRolloutOwner, end)
+
 	queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, env.GetPromClusterLabel())
 	resChJobLabels := ctx.QueryAtTime(queryJobLabels, end)
 
@@ -561,6 +564,7 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
 	resPodsWithReplicaSetOwner, _ := resChPodsWithReplicaSetOwner.Await()
 	resReplicaSetsWithoutOwners, _ := resChReplicaSetsWithoutOwners.Await()
+	resReplicaSetsWithRolloutOwner, _ := resChReplicaSetsWithRolloutOwner.Await()
 	resJobLabels, _ := resChJobLabels.Await()
 	resLBCostPerHr, _ := resChLBCostPerHr.Await()
 	resLBActiveMins, _ := resChLBActiveMins.Await()
@@ -616,7 +620,7 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
 	podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels, podUIDKeyMap, ingestPodUID)
 	podJobMap := resToPodJobMap(resJobLabels, podUIDKeyMap, ingestPodUID)
-	podReplicaSetMap := resToPodReplicaSetMap(resPodsWithReplicaSetOwner, resReplicaSetsWithoutOwners, podUIDKeyMap, ingestPodUID)
+	podReplicaSetMap := resToPodReplicaSetMap(resPodsWithReplicaSetOwner, resReplicaSetsWithoutOwners, resReplicaSetsWithRolloutOwner, podUIDKeyMap, ingestPodUID)
 	applyControllersToPods(podMap, podDeploymentMap)
 	applyControllersToPods(podMap, podStatefulSetMap)
 	applyControllersToPods(podMap, podDaemonSetMap)

+ 25 - 9
pkg/costmodel/allocation_helpers.go

@@ -1156,13 +1156,16 @@ func resToPodJobMap(resJobLabels []*prom.QueryResult, podUIDKeyMap map[podKey][]
 	return jobLabels
 }
 
-func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*prom.QueryResult, resReplicaSetsWithoutOwners []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]controllerKey {
+func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*prom.QueryResult, resReplicaSetsWithoutOwners []*prom.QueryResult, resReplicaSetsWithRolloutOwner []*prom.QueryResult, podUIDKeyMap map[podKey][]podKey, ingestPodUID bool) map[podKey]controllerKey {
 	// Build out set of ReplicaSets that have no owners, themselves, such that
 	// the ReplicaSet should be used as the owner of the Pods it controls.
 	// (This should exclude, for example, ReplicaSets that are controlled by
 	// Deployments, in which case the Deployment should be the pod's owner.)
+	// Additionally, add to this set of ReplicaSets those ReplicaSets that
+	// are owned by a Rollout
 	replicaSets := map[controllerKey]struct{}{}
 
+	// Create unowned ReplicaSet controller keys
 	for _, res := range resReplicaSetsWithoutOwners {
 		controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "replicaset")
 		if err != nil {
@@ -1172,17 +1175,34 @@ func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*prom.QueryResult, resRe
 		replicaSets[controllerKey] = struct{}{}
 	}
 
+	// Create Rollout-owned ReplicaSet controller keys
+	for _, res := range resReplicaSetsWithRolloutOwner {
+		controllerKey, err := resultReplicaSetRolloutKey(res, env.GetPromClusterLabel(), "namespace", "replicaset")
+		if err != nil {
+			continue
+		}
+
+		replicaSets[controllerKey] = struct{}{}
+	}
+
 	// Create the mapping of Pods to ReplicaSets, ignoring any ReplicaSets that
-	// to not appear in the set of uncontrolled ReplicaSets above.
+	// do not appear in the set of unowned/Rollout-owned ReplicaSets above.
 	podToReplicaSet := map[podKey]controllerKey{}
 
 	for _, res := range resPodsWithReplicaSetOwner {
+		// First, check if this pod is owned by an unowned ReplicaSet
 		controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
 		if err != nil {
 			continue
-		}
-		if _, ok := replicaSets[controllerKey]; !ok {
-			continue
+		} else if _, ok := replicaSets[controllerKey]; !ok {
+			// If the pod is not owned by an unowned ReplicaSet, check if
+			// it's owned by a Rollout-owned ReplicaSet
+			controllerKey, err = resultReplicaSetRolloutKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
+			if err != nil {
+				continue
+			} else if _, ok := replicaSets[controllerKey]; !ok {
+				continue
+			}
 		}
 
 		pod, err := res.GetString("pod")
@@ -1196,18 +1216,14 @@ func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*prom.QueryResult, resRe
 
 		if ingestPodUID {
 			if uidKeys, ok := podUIDKeyMap[key]; ok {
-
 				keys = append(keys, uidKeys...)
-
 			}
 		} else {
 			keys = []podKey{key}
 		}
 
 		for _, key := range keys {
-
 			podToReplicaSet[key] = controllerKey
-
 		}
 	}
 

+ 11 - 5
pkg/costmodel/assets.go

@@ -43,7 +43,7 @@ func (cm *CostModel) ComputeAssets(start, end time.Time) (*kubecost.AssetSet, er
 		hours := e.Sub(s).Hours()
 
 		disk := kubecost.NewDisk(d.Name, d.Cluster, d.ProviderID, s, e, kubecost.NewWindow(&start, &end))
-		cm.propertiesFromCluster(disk.Properties)
+		cm.PropertiesFromCluster(disk.Properties)
 		disk.Cost = d.Cost
 		disk.ByteHours = d.Bytes * hours
 		if d.BytesUsedAvgPtr != nil {
@@ -85,12 +85,18 @@ func (cm *CostModel) ComputeAssets(start, end time.Time) (*kubecost.AssetSet, er
 		}
 
 		loadBalancer := kubecost.NewLoadBalancer(lb.Name, lb.Cluster, lb.ProviderID, s, e, kubecost.NewWindow(&start, &end))
-		cm.propertiesFromCluster(loadBalancer.Properties)
+		cm.PropertiesFromCluster(loadBalancer.Properties)
 		loadBalancer.Cost = lb.Cost
 		assetSet.Insert(loadBalancer, nil)
 	}
 
 	for _, n := range nodeMap {
+		// check label, to see if node from fargate, if so ignore.
+		if n.Labels != nil {
+			if value, ok := n.Labels["label_eks_amazonaws_com_compute_type"]; ok && value == "fargate" {
+				continue
+			}
+		}
 		s := n.Start
 		if s.Before(start) || s.After(end) {
 			log.Debugf("CostModel.ComputeAssets: node '%s' start outside window: %s not in [%s, %s]", n.Name, s.Format("2006-01-02T15:04:05"), start.Format("2006-01-02T15:04:05"), end.Format("2006-01-02T15:04:05"))
@@ -106,7 +112,7 @@ func (cm *CostModel) ComputeAssets(start, end time.Time) (*kubecost.AssetSet, er
 		hours := e.Sub(s).Hours()
 
 		node := kubecost.NewNode(n.Name, n.Cluster, n.ProviderID, s, e, kubecost.NewWindow(&start, &end))
-		cm.propertiesFromCluster(node.Properties)
+		cm.PropertiesFromCluster(node.Properties)
 		node.NodeType = n.NodeType
 		node.CPUCoreHours = n.CPUCores * hours
 		node.RAMByteHours = n.RAMBytes * hours
@@ -151,7 +157,7 @@ func (cm *CostModel) ClusterNodes(start, end time.Time) (map[NodeIdentifier]*Nod
 }
 
 // propertiesFromCluster populates static cluster properties to individual asset properties
-func (cm *CostModel) propertiesFromCluster(props *kubecost.AssetProperties) {
+func (cm *CostModel) PropertiesFromCluster(props *kubecost.AssetProperties) {
 	// If properties does not have cluster value, do nothing
 	if props.Cluster == "" {
 		return
@@ -160,7 +166,7 @@ func (cm *CostModel) propertiesFromCluster(props *kubecost.AssetProperties) {
 	clusterMap := cm.ClusterMap.AsMap()
 	ci, ok := clusterMap[props.Cluster]
 	if !ok {
-		log.Debugf("CostMode.propertiesFromCluster: cluster '%s' was not found in ClusterMap", props.Cluster)
+		log.Debugf("CostMode.PropertiesFromCluster: cluster '%s' was not found in ClusterMap", props.Cluster)
 		return
 	}
 

+ 2 - 1
pkg/costmodel/costmodel.go

@@ -2295,7 +2295,7 @@ func measureTimeAsync(start time.Time, threshold time.Duration, name string, ch
 	}
 }
 
-func (cm *CostModel) QueryAllocation(window kubecost.Window, resolution, step time.Duration, aggregate []string, includeIdle, idleByNode, includeProportionalAssetResourceCosts bool) (*kubecost.AllocationSetRange, error) {
+func (cm *CostModel) QueryAllocation(window kubecost.Window, resolution, step time.Duration, aggregate []string, includeIdle, idleByNode, includeProportionalAssetResourceCosts, includeAggregatedMetadata bool) (*kubecost.AllocationSetRange, error) {
 	// Validate window is legal
 	if window.IsOpen() || window.IsNegative() {
 		return nil, fmt.Errorf("illegal window: %s", window)
@@ -2347,6 +2347,7 @@ func (cm *CostModel) QueryAllocation(window kubecost.Window, resolution, step ti
 	opts := &kubecost.AllocationAggregationOptions{
 		IncludeProportionalAssetResourceCosts: includeProportionalAssetResourceCosts,
 		IdleByNode:                            idleByNode,
+		IncludeAggregatedMetadata:             includeAggregatedMetadata,
 	}
 
 	// Aggregate

+ 6 - 0
pkg/costmodel/key.go

@@ -243,6 +243,12 @@ func resultReplicaSetKey(res *prom.QueryResult, clusterLabel, namespaceLabel, co
 	return resultControllerKey("replicaset", res, clusterLabel, namespaceLabel, controllerLabel)
 }
 
+// resultReplicaSetRolloutKey creates a controllerKey for a Job.
+// (See resultControllerKey for more.)
+func resultReplicaSetRolloutKey(res *prom.QueryResult, clusterLabel, namespaceLabel, controllerLabel string) (controllerKey, error) {
+	return resultControllerKey("rollout", res, clusterLabel, namespaceLabel, controllerLabel)
+}
+
 type serviceKey struct {
 	Cluster   string
 	Namespace string

+ 18 - 0
pkg/env/costmodelenv.go

@@ -18,6 +18,9 @@ const (
 	AlibabaAccessKeyIDEnvVar     = "ALIBABA_ACCESS_KEY_ID"
 	AlibabaAccessKeySecretEnvVar = "ALIBABA_SECRET_ACCESS_KEY"
 
+	AzureOfferIDEnvVar        = "AZURE_OFFER_ID"
+	AzureBillingAccountEnvVar = "AZURE_BILLING_ACCOUNT"
+
 	KubecostNamespaceEnvVar        = "KUBECOST_NAMESPACE"
 	PodNameEnvVar                  = "POD_NAME"
 	ClusterIDEnvVar                = "CLUSTER_ID"
@@ -228,6 +231,21 @@ func GetAlibabaAccessKeySecret() string {
 	return Get(AlibabaAccessKeySecretEnvVar, "")
 }
 
+// GetAzureOfferID returns the environment variable value for AzureOfferIDEnvVar which represents
+// the Azure offer ID for determining prices.
+func GetAzureOfferID() string {
+	return Get(AzureOfferIDEnvVar, "")
+}
+
+// GetAzureBillingAccount returns the environment variable value for
+// AzureBillingAccountEnvVar which represents the Azure billing
+// account for determining prices. If this is specified
+// customer-specific prices will be downloaded from the consumption
+// price sheet API.
+func GetAzureBillingAccount() string {
+	return Get(AzureBillingAccountEnvVar, "")
+}
+
 // GetKubecostNamespace returns the environment variable value for KubecostNamespaceEnvVar which
 // represents the namespace the cost model exists in.
 func GetKubecostNamespace() string {

+ 5 - 0
pkg/kubecost/allocation.go

@@ -921,6 +921,7 @@ type AllocationAggregationOptions struct {
 	ShareSplit                            string
 	SharedHourlyCosts                     map[string]float64
 	SplitIdle                             bool
+	IncludeAggregatedMetadata             bool
 }
 
 // AggregateBy aggregates the Allocations in the given AllocationSet by the given
@@ -1041,6 +1042,10 @@ func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAg
 	// them to their respective sets, removing them from the set of allocations
 	// to aggregate.
 	for _, alloc := range as.Allocations {
+		// if the user does not want any aggregated labels/annotations returned
+		// set the properties accordingly
+		alloc.Properties.AggregatedMetadata = options.IncludeAggregatedMetadata
+
 		// External allocations get aggregated post-hoc (see step 6) and do
 		// not necessarily contain complete sets of properties, so they are
 		// moved to a separate AllocationSet.

+ 41 - 0
pkg/kubecost/allocationprops.go

@@ -103,6 +103,9 @@ type AllocationProperties struct {
 	ProviderID     string                `json:"providerID,omitempty"`
 	Labels         AllocationLabels      `json:"labels,omitempty"`
 	Annotations    AllocationAnnotations `json:"annotations,omitempty"`
+	// When set to true, maintain the intersection of all labels + annotations
+	// in the aggregated AllocationProperties object
+	AggregatedMetadata bool `json:"-"`
 }
 
 // AllocationLabels is a schema-free mapping of key/value pairs that can be
@@ -439,7 +442,31 @@ func (p *AllocationProperties) Intersection(that *AllocationProperties) *Allocat
 		intersectionProps.ControllerKind = p.ControllerKind
 	}
 	if p.Namespace == that.Namespace {
+
 		intersectionProps.Namespace = p.Namespace
+		// ignore the incoming labels from unallocated or unmounted special case pods
+		if p.AggregatedMetadata || that.AggregatedMetadata {
+			intersectionProps.AggregatedMetadata = true
+
+			// When aggregating by metadata, we maintain the intersection of the labels/annotations
+			// of the two AllocationProperties objects being intersected here.
+			// Special case unallocated/unmounted Allocations never have any labels or annotations.
+			// As a result, they have the effect of always clearing out the intersection,
+			// regardless if all the other actual allocations/etc have them.
+			// This logic is designed to effectively ignore the unmounted/unallocated objects
+			// and just copy over the labels from the other object - we only take the intersection
+			// of 'legitimate' allocations.
+			if p.Container == UnmountedSuffix {
+				intersectionProps.Annotations = that.Annotations
+				intersectionProps.Labels = that.Labels
+			} else if that.Container == UnmountedSuffix {
+				intersectionProps.Annotations = p.Annotations
+				intersectionProps.Labels = p.Labels
+			} else {
+				intersectionProps.Annotations = mapIntersection(p.Annotations, that.Annotations)
+				intersectionProps.Labels = mapIntersection(p.Labels, that.Labels)
+			}
+		}
 	}
 	if p.Pod == that.Pod {
 		intersectionProps.Pod = p.Pod
@@ -450,6 +477,20 @@ func (p *AllocationProperties) Intersection(that *AllocationProperties) *Allocat
 	return intersectionProps
 }
 
+func mapIntersection(map1, map2 map[string]string) map[string]string {
+	result := make(map[string]string)
+	for key, value := range map1 {
+		if value2, ok := map2[key]; ok {
+			if value2 == value {
+				result[key] = value
+			}
+		}
+
+	}
+
+	return result
+}
+
 func (p *AllocationProperties) String() string {
 	if p == nil {
 		return "<nil>"

+ 104 - 0
pkg/kubecost/allocationprops_test.go

@@ -5,6 +5,110 @@ import (
 	"testing"
 )
 
+func TestAllocationPropsIntersection(t *testing.T) {
+	cases := map[string]struct {
+		allocationProps1 *AllocationProperties
+		allocationProps2 *AllocationProperties
+		expected         *AllocationProperties
+	}{
+		"intersection two allocation properties with empty labels/annotations": {
+			allocationProps1: &AllocationProperties{
+				Labels:      map[string]string{},
+				Annotations: map[string]string{},
+			},
+			allocationProps2: &AllocationProperties{
+				Labels:      map[string]string{},
+				Annotations: map[string]string{},
+			},
+			expected: &AllocationProperties{
+				Labels:      nil,
+				Annotations: nil,
+			},
+		},
+		"nil intersection": {
+			allocationProps1: nil,
+			allocationProps2: nil,
+			expected:         nil,
+		},
+		"intersection, with labels/annotations, no aggregated metdata": {
+			allocationProps1: &AllocationProperties{
+				AggregatedMetadata: false,
+				Node:               "node1",
+				Labels:             map[string]string{"key1": "val1"},
+				Annotations:        map[string]string{"key2": "val2"},
+			},
+			allocationProps2: &AllocationProperties{
+				AggregatedMetadata: false,
+				Node:               "node1",
+				Labels:             map[string]string{"key3": "val3"},
+				Annotations:        map[string]string{"key4": "val4"},
+			},
+			expected: &AllocationProperties{
+				AggregatedMetadata: false,
+				Node:               "node1",
+				Labels:             nil,
+				Annotations:        nil,
+			},
+		},
+		"intersection, with labels/annotations, with aggregated metdata": {
+			allocationProps1: &AllocationProperties{
+				AggregatedMetadata: false,
+				ControllerKind:     "controller1",
+				Namespace:          "ns1",
+				Labels:             map[string]string{"key1": "val1"},
+				Annotations:        map[string]string{"key2": "val2"},
+			},
+			allocationProps2: &AllocationProperties{
+				AggregatedMetadata: true,
+				ControllerKind:     "controller2",
+				Namespace:          "ns1",
+				Labels:             map[string]string{"key1": "val1"},
+				Annotations:        map[string]string{"key2": "val2"},
+			},
+			expected: &AllocationProperties{
+				AggregatedMetadata: true,
+				Namespace:          "ns1",
+				ControllerKind:     "",
+				Labels:             map[string]string{"key1": "val1"},
+				Annotations:        map[string]string{"key2": "val2"},
+			},
+		},
+		"intersection, with labels/annotations, special case container": {
+			allocationProps1: &AllocationProperties{
+				AggregatedMetadata: false,
+				Container:          UnmountedSuffix,
+				Namespace:          "ns1",
+				Labels:             map[string]string{},
+				Annotations:        map[string]string{},
+			},
+			allocationProps2: &AllocationProperties{
+				AggregatedMetadata: true,
+				Container:          "container3",
+				Namespace:          "ns1",
+				Labels:             map[string]string{"key1": "val1"},
+				Annotations:        map[string]string{"key2": "val2"},
+			},
+			expected: &AllocationProperties{
+				AggregatedMetadata: true,
+				Namespace:          "ns1",
+				ControllerKind:     "",
+				Labels:             map[string]string{"key1": "val1"},
+				Annotations:        map[string]string{"key2": "val2"},
+			},
+		},
+	}
+
+	for name, tc := range cases {
+		t.Run(name, func(t *testing.T) {
+
+			actual := tc.allocationProps1.Intersection(tc.allocationProps2)
+
+			if !reflect.DeepEqual(actual, tc.expected) {
+				t.Fatalf("test case %s: expected %+v; got %+v", name, tc.expected, actual)
+			}
+		})
+	}
+}
 func TestGenerateKey(t *testing.T) {
 
 	customOwnerLabelConfig := NewLabelConfig()