Browse Source

Merge branch 'develop' into feature/kubemodel-2

Sean Holcomb 14 hours ago
parent
commit
2b35e243e7

+ 8 - 4
pkg/cloud/aws/provider.go

@@ -17,6 +17,7 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/aws/smithy-go"
 	"github.com/aws/smithy-go"
+	"github.com/opencost/opencost/pkg/cloud/httputil"
 	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/cloud/utils"
 	"github.com/opencost/opencost/pkg/cloud/utils"
 
 
@@ -858,7 +859,10 @@ func (aws *AWS) getRegionPricing(nodeList []*clustercache.Node) (*http.Response,
 	}
 	}
 
 
 	log.Infof("starting download of \"%s\", which is quite large ...", pricingURL)
 	log.Infof("starting download of \"%s\", which is quite large ...", pricingURL)
-	resp, err := http.Get(pricingURL)
+	// This file is large and can take a while to stream, so the streaming client
+	// bounds connect/TLS/response-header time but not the total body read - enough
+	// to bail on a hung endpoint without truncating a legitimate slow download.
+	resp, err := httputil.StreamingGet(context.Background(), pricingURL)
 	if err != nil {
 	if err != nil {
 		log.Errorf("Bogus fetch of \"%s\": %v", pricingURL, err)
 		log.Errorf("Bogus fetch of \"%s\": %v", pricingURL, err)
 		return nil, pricingURL, err
 		return nil, pricingURL, err
@@ -1352,7 +1356,7 @@ func (aws *AWS) spotPricingFromHistory(k models.Key) (*SpotPriceHistoryEntry, bo
 
 
 	price, err := aws.SpotPriceHistoryCache.GetSpotPrice(region, instanceType, availabilityZone)
 	price, err := aws.SpotPriceHistoryCache.GetSpotPrice(region, instanceType, availabilityZone)
 	if err != nil {
 	if err != nil {
-		log.DedupedWarningf(10, "Failed to get spot price history for instance %s: %s", k.ID(), err.Error())
+		log.Debugf("Failed to get spot price history for instance %s: %s", k.ID(), err.Error())
 		return nil, false
 		return nil, false
 	}
 	}
 	return price, true
 	return price, true
@@ -1483,7 +1487,7 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k models.Ke
 			UsageType:    PreemptibleType,
 			UsageType:    PreemptibleType,
 		}, meta, nil
 		}, meta, nil
 	} else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
 	} else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
-		log.DedupedWarningf(5, "Node %s marked preemptible but no spot feed data available; falling back to other pricing sources", k.ID())
+		log.Debugf("Node %s marked preemptible but no spot feed data available; falling back to other pricing sources", k.ID())
 
 
 		// Try to get spot pricing from DescribeSpotPriceHistory API
 		// Try to get spot pricing from DescribeSpotPriceHistory API
 		if historyEntry, ok := aws.spotPricingFromHistory(k); ok {
 		if historyEntry, ok := aws.spotPricingFromHistory(k); ok {
@@ -1505,7 +1509,7 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k models.Ke
 
 
 		if publicPricingFound {
 		if publicPricingFound {
 			// return public price if found
 			// return public price if found
-			log.DedupedWarningf(5, "No spot price history available for %s, falling back to on-demand pricing", k.ID())
+			log.Debugf("No spot price history available for %s, falling back to on-demand pricing", k.ID())
 			return &models.Node{
 			return &models.Node{
 				Cost:         cost,
 				Cost:         cost,
 				VCPU:         terms.VCpu,
 				VCPU:         terms.VCpu,

+ 5 - 1
pkg/cloud/azure/pricesheetdownloader.go

@@ -18,6 +18,7 @@ import (
 	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
 	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
 
 
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/pkg/cloud/httputil"
 )
 )
 
 
 type PriceSheetDownloader struct {
 type PriceSheetDownloader struct {
@@ -79,7 +80,10 @@ func (d PriceSheetDownloader) saveData(ctx context.Context, url, tempName string
 		return nil, fmt.Errorf("creating %s temp file: %w", tempName, err)
 		return nil, fmt.Errorf("creating %s temp file: %w", tempName, err)
 	}
 	}
 
 
-	resp, err := http.Get(url)
+	// The price sheet can be large, so the streaming client bounds connect/TLS/
+	// response-header time but not the body read, avoiding truncation of a slow
+	// download. Pass the caller's context so the download is cancelable.
+	resp, err := httputil.StreamingGet(ctx, url)
 	if err != nil {
 	if err != nil {
 		return nil, fmt.Errorf("downloading: %w", err)
 		return nil, fmt.Errorf("downloading: %w", err)
 	}
 	}

+ 79 - 27
pkg/cloud/azure/provider.go

@@ -29,6 +29,7 @@ import (
 	"github.com/opencost/opencost/core/pkg/util/fileutil"
 	"github.com/opencost/opencost/core/pkg/util/fileutil"
 	"github.com/opencost/opencost/core/pkg/util/json"
 	"github.com/opencost/opencost/core/pkg/util/json"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
+	"github.com/opencost/opencost/pkg/cloud/httputil"
 	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/cloud/utils"
 	"github.com/opencost/opencost/pkg/cloud/utils"
 	"github.com/opencost/opencost/pkg/env"
 	"github.com/opencost/opencost/pkg/env"
@@ -273,60 +274,92 @@ func buildAzureRetailPricesURL(region string, skuName string, currencyCode strin
 	return pricingURL
 	return pricingURL
 }
 }
 
 
-func extractAzureVMRetailAndSpotPrices(resp *http.Response) (retailPrice string, spotPrice string, err error) {
+func extractAzureVMRetailAndSpotPrices(resp *http.Response) (linuxRetailPrice string, windowsRetailPrice string, spotPrice string, windowsSpotPrice string, err error) {
 	body, err := io.ReadAll(resp.Body)
 	body, err := io.ReadAll(resp.Body)
 	if err != nil {
 	if err != nil {
-		return "", "", fmt.Errorf("Error getting response: %v", err)
+		return "", "", "", "", fmt.Errorf("error getting response: %w", err)
 	}
 	}
 
 
 	pricingPayload := AzureRetailPricing{}
 	pricingPayload := AzureRetailPricing{}
 	jsonErr := json.Unmarshal(body, &pricingPayload)
 	jsonErr := json.Unmarshal(body, &pricingPayload)
 	if jsonErr != nil {
 	if jsonErr != nil {
-		return "", "", fmt.Errorf("error unmarshalling data: %v", jsonErr)
+		return "", "", "", "", fmt.Errorf("error unmarshalling data: %w", jsonErr)
 	}
 	}
 	for _, item := range pricingPayload.Items {
 	for _, item := range pricingPayload.Items {
-		// note: Windows OS ondemand price will be equal to Linux, Adoption of Windows based
-		// computes are increasing in Azure we might want to enhance this in future.
-		if !strings.Contains(item.ProductName, "Windows") {
-			if strings.Contains(strings.ToLower(item.SkuName), " spot") {
+		skuLower := strings.ToLower(item.SkuName)
+		productLower := strings.ToLower(item.ProductName)
+		isWindowsProduct := strings.Contains(productLower, "windows")
+		if strings.Contains(skuLower, " spot") {
+			if isWindowsProduct {
+				windowsSpotPrice = fmt.Sprintf("%f", item.RetailPrice)
+			} else {
 				spotPrice = fmt.Sprintf("%f", item.RetailPrice)
 				spotPrice = fmt.Sprintf("%f", item.RetailPrice)
-			} else if !(strings.Contains(strings.ToLower(item.SkuName), "low priority") || strings.Contains(strings.ToLower(item.ProductName), "cloud services") || strings.Contains(strings.ToLower(item.ProductName), "cloudservices")) {
-				retailPrice = fmt.Sprintf("%f", item.RetailPrice)
+			}
+		} else if !(strings.Contains(skuLower, "low priority") || strings.Contains(productLower, "cloud services") || strings.Contains(productLower, "cloudservices")) {
+			if isWindowsProduct {
+				windowsRetailPrice = fmt.Sprintf("%f", item.RetailPrice)
+			} else {
+				linuxRetailPrice = fmt.Sprintf("%f", item.RetailPrice)
 			}
 			}
 		}
 		}
 	}
 	}
-	return retailPrice, spotPrice, nil
+	return linuxRetailPrice, windowsRetailPrice, spotPrice, windowsSpotPrice, nil
 }
 }
 
 
-func getRetailPrice(region string, skuName string, currencyCode string, spot bool) (string, error) {
+func getRetailPrice(region string, skuName string, currencyCode string, spot bool, isWindows bool) (string, error) {
 	pricingURL := buildAzureRetailPricesURL(region, skuName, currencyCode)
 	pricingURL := buildAzureRetailPricesURL(region, skuName, currencyCode)
 	log.Infof("starting download retail price payload from \"%s\"", pricingURL)
 	log.Infof("starting download retail price payload from \"%s\"", pricingURL)
 
 
-	resp, err := http.Get(pricingURL)
+	// Single SKU lookup returns a small payload, so the shared bounded client
+	// keeps a hung endpoint from blocking pricing without risking truncation.
+	client := httputil.BoundedClient()
+	resp, err := client.Get(pricingURL)
 	if err != nil {
 	if err != nil {
-		return "", fmt.Errorf("failed to fetch retail price with URL \"%s\": %v", pricingURL, err)
+		return "", fmt.Errorf("failed to fetch retail price with URL \"%s\": %w", pricingURL, err)
 	}
 	}
 
 
 	if resp.StatusCode < 200 && resp.StatusCode > 299 {
 	if resp.StatusCode < 200 && resp.StatusCode > 299 {
 		return "", fmt.Errorf("retail price responded with error status code %d", resp.StatusCode)
 		return "", fmt.Errorf("retail price responded with error status code %d", resp.StatusCode)
 	}
 	}
 
 
-	retailPrice, spotPrice, err := extractAzureVMRetailAndSpotPrices(resp)
+	linuxRetailPrice, windowsRetailPrice, spotPrice, windowsSpotPrice, err := extractAzureVMRetailAndSpotPrices(resp)
 	if err != nil {
 	if err != nil {
-		return "", fmt.Errorf("failed to extract azure prices: %v", err)
+		return "", fmt.Errorf("failed to extract azure prices: %w", err)
 	}
 	}
 
 
 	log.DedupedInfof(5, "done parsing retail price payload from \"%s\"\n", pricingURL)
 	log.DedupedInfof(5, "done parsing retail price payload from \"%s\"\n", pricingURL)
 
 
-	if spot && spotPrice != "" {
-		return spotPrice, nil
+	return selectRetailPrice(region, skuName, linuxRetailPrice, windowsRetailPrice, spotPrice, windowsSpotPrice, spot, isWindows)
+}
+
+// selectRetailPrice picks the price matching the node OS and pricing model.
+// Windows nodes prefer the Windows-specific price; when it is absent the Linux
+// price is used as a best-effort estimate and the fallback is logged so the
+// substitution is not silent.
+func selectRetailPrice(region, skuName, linuxRetailPrice, windowsRetailPrice, spotPrice, windowsSpotPrice string, spot, isWindows bool) (string, error) {
+	if spot {
+		if isWindows && windowsSpotPrice != "" {
+			return windowsSpotPrice, nil
+		}
+		if spotPrice != "" {
+			if isWindows {
+				log.Warnf("no Windows spot price for %q in %q region; falling back to Linux spot price", skuName, region)
+			}
+			return spotPrice, nil
+		}
 	}
 	}
 
 
-	if retailPrice == "" {
-		return retailPrice, fmt.Errorf("Couldn't find price for product \"%s\" in \"%s\" region", skuName, region)
+	selectedRetail := linuxRetailPrice
+	if isWindows && windowsRetailPrice != "" {
+		selectedRetail = windowsRetailPrice
+	} else if isWindows && linuxRetailPrice != "" {
+		log.Warnf("no Windows retail price for %q in %q region; falling back to Linux retail price", skuName, region)
+	}
+	if selectedRetail == "" {
+		return "", fmt.Errorf("couldn't find price for product %q in %q region", skuName, region)
 	}
 	}
 
 
-	return retailPrice, nil
+	return selectedRetail, nil
 }
 }
 
 
 func toRegionID(meterRegion string, regions map[string]string) (string, error) {
 func toRegionID(meterRegion string, regions map[string]string) (string, error) {
@@ -440,6 +473,17 @@ func (az *Azure) PricingSourceSummary() interface{} {
 	return az.Pricing
 	return az.Pricing
 }
 }
 
 
+// azureWindowsOS is the node OS label value that identifies a Windows node and
+// the suffix used to qualify Windows-specific pricing keys.
+const azureWindowsOS = "windows"
+
+// isWindowsNode reports whether the node labels identify a Windows node. It
+// centralizes the OS detection shared by azureKey.Features and NodePricing.
+func isWindowsNode(labels map[string]string) bool {
+	osLabel, ok := util.GetOperatingSystem(labels)
+	return ok && strings.ToLower(osLabel) == azureWindowsOS
+}
+
 type azureKey struct {
 type azureKey struct {
 	Labels        map[string]string
 	Labels        map[string]string
 	GPULabel      string
 	GPULabel      string
@@ -451,6 +495,9 @@ func (k *azureKey) Features() string {
 	region := strings.ToLower(r)
 	region := strings.ToLower(r)
 	instance, _ := util.GetInstanceType(k.Labels)
 	instance, _ := util.GetInstanceType(k.Labels)
 	usageType := "ondemand"
 	usageType := "ondemand"
+	if isWindowsNode(k.Labels) {
+		return fmt.Sprintf("%s,%s,%s,%s", region, instance, usageType, azureWindowsOS)
+	}
 	return fmt.Sprintf("%s,%s,%s", region, instance, usageType)
 	return fmt.Sprintf("%s,%s,%s", region, instance, usageType)
 }
 }
 
 
@@ -992,10 +1039,7 @@ func convertMeterToPricings(info commerce.MeterInfo, regions map[string]string,
 		return nil, nil
 		return nil, nil
 	}
 	}
 
 
-	if strings.Contains(meterSubCategory, "Windows") {
-		// This meter doesn't correspond to any pricings.
-		return nil, nil
-	}
+	isWindowsMeter := strings.Contains(meterSubCategory, "Windows")
 
 
 	if strings.Contains(meterSubCategory, "Cloud Services") || strings.Contains(meterSubCategory, "CloudServices") {
 	if strings.Contains(meterSubCategory, "Cloud Services") || strings.Contains(meterSubCategory, "CloudServices") {
 		// This meter doesn't correspond to any pricings.
 		// This meter doesn't correspond to any pricings.
@@ -1079,8 +1123,10 @@ func convertMeterToPricings(info commerce.MeterInfo, regions map[string]string,
 	priceStr := fmt.Sprintf("%f", priceInUsd)
 	priceStr := fmt.Sprintf("%f", priceInUsd)
 	results := make(map[string]*AzurePricing)
 	results := make(map[string]*AzurePricing)
 	for _, instanceType := range instanceTypes {
 	for _, instanceType := range instanceTypes {
-
 		key := fmt.Sprintf("%s,%s,%s", region, instanceType, usageType)
 		key := fmt.Sprintf("%s,%s,%s", region, instanceType, usageType)
+		if isWindowsMeter {
+			key = fmt.Sprintf("%s,%s,%s,%s", region, instanceType, usageType, azureWindowsOS)
+		}
 		pricing := &AzurePricing{
 		pricing := &AzurePricing{
 			Node: &models.Node{
 			Node: &models.Node{
 				Cost:         priceStr,
 				Cost:         priceStr,
@@ -1165,12 +1211,18 @@ func (az *Azure) NodePricing(key models.Key) (*models.Node, models.PricingMetada
 	slv, ok := azKey.Labels[config.SpotLabel]
 	slv, ok := azKey.Labels[config.SpotLabel]
 	isSpot := ok && slv == config.SpotLabelValue && config.SpotLabel != "" && config.SpotLabelValue != ""
 	isSpot := ok && slv == config.SpotLabelValue && config.SpotLabel != "" && config.SpotLabelValue != ""
 
 
+	isWindows := isWindowsNode(azKey.Labels)
+
 	features := strings.Split(azKey.Features(), ",")
 	features := strings.Split(azKey.Features(), ",")
 	region := features[0]
 	region := features[0]
 	instance := features[1]
 	instance := features[1]
 	var featureString string
 	var featureString string
 	if isSpot {
 	if isSpot {
-		featureString = fmt.Sprintf("%s,%s,spot", region, instance)
+		if isWindows {
+			featureString = fmt.Sprintf("%s,%s,spot,%s", region, instance, azureWindowsOS)
+		} else {
+			featureString = fmt.Sprintf("%s,%s,spot", region, instance)
+		}
 	} else {
 	} else {
 		featureString = azKey.Features()
 		featureString = azKey.Features()
 	}
 	}
@@ -1187,7 +1239,7 @@ func (az *Azure) NodePricing(key models.Key) (*models.Node, models.PricingMetada
 		}
 		}
 	}
 	}
 
 
-	cost, err := getRetailPrice(region, instance, config.CurrencyCode, isSpot)
+	cost, err := getRetailPrice(region, instance, config.CurrencyCode, isSpot, isWindows)
 
 
 	if err != nil {
 	if err != nil {
 		log.DedupedWarningf(5, "failed to retrieve retail pricing: %s", err)
 		log.DedupedWarningf(5, "failed to retrieve retail pricing: %s", err)

+ 209 - 13
pkg/cloud/azure/provider_test.go

@@ -69,7 +69,13 @@ func TestConvertMeterToPricings(t *testing.T) {
 		info := meterInfo("Virtual Machines", "D2 Series Windows", "D2s v3", "AU Southeast", 0.3)
 		info := meterInfo("Virtual Machines", "D2 Series Windows", "D2s v3", "AU Southeast", 0.3)
 		results, err := convertMeterToPricings(info, regions, baseCPUPrice)
 		results, err := convertMeterToPricings(info, regions, baseCPUPrice)
 		require.NoError(t, err)
 		require.NoError(t, err)
-		require.Nil(t, results)
+		key := "australiasoutheast,Standard_D2s_v3,ondemand,windows"
+		pricing, ok := results[key]
+		require.Truef(t, ok, "expected a pricing entry under key %q", key)
+		require.NotNil(t, pricing.Node)
+		require.Equal(t, "ondemand", pricing.Node.UsageType)
+		require.Equal(t, "0.300000", pricing.Node.Cost)
+		require.Equal(t, baseCPUPrice, pricing.Node.BaseCPUPrice)
 	})
 	})
 
 
 	t.Run("storage", func(t *testing.T) {
 	t.Run("storage", func(t *testing.T) {
@@ -102,6 +108,86 @@ func TestConvertMeterToPricings(t *testing.T) {
 	})
 	})
 }
 }
 
 
+func TestSelectRetailPrice(t *testing.T) {
+	cases := []struct {
+		name               string
+		linuxRetailPrice   string
+		windowsRetailPrice string
+		spotPrice          string
+		windowsSpotPrice   string
+		spot               bool
+		isWindows          bool
+		expected           string
+		expectErr          bool
+	}{
+		{
+			name:               "windows retail prefers windows price",
+			linuxRetailPrice:   "1.000000",
+			windowsRetailPrice: "2.000000",
+			isWindows:          true,
+			expected:           "2.000000",
+		},
+		{
+			name:             "windows retail falls back to linux when windows missing",
+			linuxRetailPrice: "1.000000",
+			isWindows:        true,
+			expected:         "1.000000",
+		},
+		{
+			name:             "linux retail uses linux price",
+			linuxRetailPrice: "1.000000",
+			isWindows:        false,
+			expected:         "1.000000",
+		},
+		{
+			name:             "windows spot prefers windows spot price",
+			spotPrice:        "0.500000",
+			windowsSpotPrice: "0.900000",
+			spot:             true,
+			isWindows:        true,
+			expected:         "0.900000",
+		},
+		{
+			name:      "windows spot falls back to linux spot when windows missing",
+			spotPrice: "0.500000",
+			spot:      true,
+			isWindows: true,
+			expected:  "0.500000",
+		},
+		{
+			name:      "linux spot uses linux spot price",
+			spotPrice: "0.500000",
+			spot:      true,
+			isWindows: false,
+			expected:  "0.500000",
+		},
+		{
+			name:               "spot windows with no spot price falls back to retail",
+			windowsRetailPrice: "2.000000",
+			spot:               true,
+			isWindows:          true,
+			expected:           "2.000000",
+		},
+		{
+			name:      "no price available returns error",
+			isWindows: true,
+			expectErr: true,
+		},
+	}
+
+	for _, tc := range cases {
+		t.Run(tc.name, func(t *testing.T) {
+			got, err := selectRetailPrice("eastus", "Standard_D2s_v3", tc.linuxRetailPrice, tc.windowsRetailPrice, tc.spotPrice, tc.windowsSpotPrice, tc.spot, tc.isWindows)
+			if tc.expectErr {
+				require.Error(t, err)
+				return
+			}
+			require.NoError(t, err)
+			require.Equal(t, tc.expected, got)
+		})
+	}
+}
+
 func TestAzure_findCostForDisk(t *testing.T) {
 func TestAzure_findCostForDisk(t *testing.T) {
 	var loc string = "location"
 	var loc string = "location"
 	var size int32 = 1
 	var size int32 = 1
@@ -390,14 +476,76 @@ func Test_buildAzureRetailPricesURL(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestAzureKeyFeaturesOS(t *testing.T) {
+	tests := []struct {
+		name     string
+		labels   map[string]string
+		expected string
+	}{
+		{
+			name: "windows node via kubernetes.io/os",
+			labels: map[string]string{
+				"kubernetes.io/os":                 "windows",
+				"node.kubernetes.io/instance-type": "Standard_D4s_v3",
+				"topology.kubernetes.io/region":    "eastus",
+			},
+			expected: "eastus,Standard_D4s_v3,ondemand,windows",
+		},
+		{
+			name: "windows node via beta.kubernetes.io/os",
+			labels: map[string]string{
+				"beta.kubernetes.io/os":            "windows",
+				"node.kubernetes.io/instance-type": "Standard_D4s_v3",
+				"topology.kubernetes.io/region":    "eastus",
+			},
+			expected: "eastus,Standard_D4s_v3,ondemand,windows",
+		},
+		{
+			name: "linux node",
+			labels: map[string]string{
+				"kubernetes.io/os":                 "linux",
+				"node.kubernetes.io/instance-type": "Standard_D4s_v3",
+				"topology.kubernetes.io/region":    "eastus",
+			},
+			expected: "eastus,Standard_D4s_v3,ondemand",
+		},
+		{
+			name: "no OS label defaults to linux key",
+			labels: map[string]string{
+				"node.kubernetes.io/instance-type": "Standard_D4s_v3",
+				"topology.kubernetes.io/region":    "eastus",
+			},
+			expected: "eastus,Standard_D4s_v3,ondemand",
+		},
+		{
+			name: "windows case-insensitive",
+			labels: map[string]string{
+				"kubernetes.io/os":                 "Windows",
+				"node.kubernetes.io/instance-type": "Standard_D4s_v3",
+				"topology.kubernetes.io/region":    "eastus",
+			},
+			expected: "eastus,Standard_D4s_v3,ondemand,windows",
+		},
+	}
+
+	for _, tc := range tests {
+		t.Run(tc.name, func(t *testing.T) {
+			key := &azureKey{Labels: tc.labels}
+			require.Equal(t, tc.expected, key.Features())
+		})
+	}
+}
+
 func Test_extractAzureVMRetailAndSpotPrices(t *testing.T) {
 func Test_extractAzureVMRetailAndSpotPrices(t *testing.T) {
 	testCases := []struct {
 	testCases := []struct {
-		name             string
-		jsonResponse     string
-		expectedRetail   string
-		expectedSpot     string
-		expectedError    bool
-		expectedErrorMsg string
+		name                  string
+		jsonResponse          string
+		expectedRetail        string
+		expectedWindowsRetail string
+		expectedSpot          string
+		expectedWindowsSpot   string
+		expectedError         bool
+		expectedErrorMsg      string
 	}{
 	}{
 		{
 		{
 			name: "valid response with retail and spot prices",
 			name: "valid response with retail and spot prices",
@@ -503,7 +651,7 @@ func Test_extractAzureVMRetailAndSpotPrices(t *testing.T) {
 			expectedError:  false,
 			expectedError:  false,
 		},
 		},
 		{
 		{
-			name: "filters out Windows instances",
+			name: "returns separate Windows and Linux prices",
 			jsonResponse: `{
 			jsonResponse: `{
 				"BillingCurrency": "USD",
 				"BillingCurrency": "USD",
 				"CustomerEntityId": "Default",
 				"CustomerEntityId": "Default",
@@ -528,9 +676,35 @@ func Test_extractAzureVMRetailAndSpotPrices(t *testing.T) {
 				],
 				],
 				"Count": 2
 				"Count": 2
 			}`,
 			}`,
-			expectedRetail: "0.192000",
-			expectedSpot:   "",
-			expectedError:  false,
+			expectedRetail:        "0.192000",
+			expectedWindowsRetail: "0.500000",
+			expectedSpot:          "",
+			expectedWindowsSpot:   "",
+			expectedError:         false,
+		},
+		{
+			name: "windows spot price available",
+			jsonResponse: `{
+				"BillingCurrency": "USD",
+				"CustomerEntityId": "Default",
+				"CustomerEntityType": "Retail",
+				"Items": [
+					{
+						"currencyCode": "USD",
+						"retailPrice": 0.12,
+						"armRegionName": "eastus",
+						"productName": "Virtual Machines Dsv3 Series Windows",
+						"skuName": "D4s v3 Spot",
+						"armSkuName": "Standard_D4s_v3"
+					}
+				],
+				"Count": 1
+			}`,
+			expectedRetail:        "",
+			expectedWindowsRetail: "",
+			expectedSpot:          "",
+			expectedWindowsSpot:   "0.120000",
+			expectedError:         false,
 		},
 		},
 		{
 		{
 			name: "filters out low priority instances",
 			name: "filters out low priority instances",
@@ -600,7 +774,7 @@ func Test_extractAzureVMRetailAndSpotPrices(t *testing.T) {
 				Body:       io.NopCloser(bytes.NewBufferString(tc.jsonResponse)),
 				Body:       io.NopCloser(bytes.NewBufferString(tc.jsonResponse)),
 			}
 			}
 
 
-			retailPrice, spotPrice, err := extractAzureVMRetailAndSpotPrices(resp)
+			linuxRetail, windowsRetail, spotPrice, windowsSpotPrice, err := extractAzureVMRetailAndSpotPrices(resp)
 
 
 			if tc.expectedError {
 			if tc.expectedError {
 				require.Error(t, err)
 				require.Error(t, err)
@@ -609,9 +783,31 @@ func Test_extractAzureVMRetailAndSpotPrices(t *testing.T) {
 				}
 				}
 			} else {
 			} else {
 				require.NoError(t, err)
 				require.NoError(t, err)
-				require.Equal(t, tc.expectedRetail, retailPrice, "Retail price mismatch")
+				require.Equal(t, tc.expectedRetail, linuxRetail, "Linux retail price mismatch")
+				require.Equal(t, tc.expectedWindowsRetail, windowsRetail, "Windows retail price mismatch")
 				require.Equal(t, tc.expectedSpot, spotPrice, "Spot price mismatch")
 				require.Equal(t, tc.expectedSpot, spotPrice, "Spot price mismatch")
+				require.Equal(t, tc.expectedWindowsSpot, windowsSpotPrice, "Windows spot price mismatch")
 			}
 			}
 		})
 		})
 	}
 	}
 }
 }
+
+// failingReader is an io.Reader that always errors, used to exercise the
+// response body read-failure path in extractAzureVMRetailAndSpotPrices.
+type failingReader struct{}
+
+func (failingReader) Read(_ []byte) (int, error) {
+	return 0, fmt.Errorf("simulated read failure")
+}
+
+func Test_extractAzureVMRetailAndSpotPrices_bodyReadError(t *testing.T) {
+	resp := &http.Response{
+		StatusCode: 200,
+		Body:       io.NopCloser(failingReader{}),
+	}
+
+	_, _, _, _, err := extractAzureVMRetailAndSpotPrices(resp)
+
+	require.Error(t, err)
+	require.Contains(t, err.Error(), "error getting response")
+}

+ 6 - 1
pkg/cloud/gcp/provider.go

@@ -17,6 +17,7 @@ import (
 
 
 	coreenv "github.com/opencost/opencost/core/pkg/env"
 	coreenv "github.com/opencost/opencost/core/pkg/env"
 	"github.com/opencost/opencost/pkg/cloud/aws"
 	"github.com/opencost/opencost/pkg/cloud/aws"
+	"github.com/opencost/opencost/pkg/cloud/httputil"
 	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/cloud/utils"
 	"github.com/opencost/opencost/pkg/cloud/utils"
 
 
@@ -979,7 +980,9 @@ func (gcp *GCP) getBillingAPIClientAndURL(apiKey, currencyCode string) (*http.Cl
 	url := gcp.buildBillingAPIURL(apiKey, currencyCode)
 	url := gcp.buildBillingAPIURL(apiKey, currencyCode)
 
 
 	if apiKey != "" {
 	if apiKey != "" {
-		return http.DefaultClient, url.String(), nil
+		// Shared client carries a request timeout so a hung billing endpoint
+		// can't block the pricing refresh.
+		return httputil.BoundedClient(), url.String(), nil
 	}
 	}
 
 
 	googleHttpClient, err := google.DefaultClient(context.TODO(), GCPCloudOAuthScope)
 	googleHttpClient, err := google.DefaultClient(context.TODO(), GCPCloudOAuthScope)
@@ -987,6 +990,8 @@ func (gcp *GCP) getBillingAPIClientAndURL(apiKey, currencyCode string) (*http.Cl
 		log.Errorf("GCP Billing API: Workload Identity detected but failed to create authenticated client: %v", err)
 		log.Errorf("GCP Billing API: Workload Identity detected but failed to create authenticated client: %v", err)
 		return nil, "", err
 		return nil, "", err
 	}
 	}
+	// google.DefaultClient has no timeout by default; bound it to match the keyed path.
+	googleHttpClient.Timeout = httputil.PricingTimeout
 
 
 	return googleHttpClient, url.String(), nil
 	return googleHttpClient, url.String(), nil
 }
 }

+ 3 - 2
pkg/cloud/gcp/provider_test.go

@@ -4,7 +4,6 @@ import (
 	"bytes"
 	"bytes"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
-	"net/http"
 	"net/url"
 	"net/url"
 	"os"
 	"os"
 	"reflect"
 	"reflect"
@@ -14,6 +13,7 @@ import (
 
 
 	"github.com/google/martian/log"
 	"github.com/google/martian/log"
 	"github.com/opencost/opencost/core/pkg/clustercache"
 	"github.com/opencost/opencost/core/pkg/clustercache"
+	"github.com/opencost/opencost/pkg/cloud/httputil"
 	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/config"
 	"github.com/opencost/opencost/pkg/config"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/assert"
@@ -758,7 +758,8 @@ func TestGCP_getBillingAPIClientAndURL(t *testing.T) {
 	client, rawURL, err := gcp.getBillingAPIClientAndURL("test-key", "USD")
 	client, rawURL, err := gcp.getBillingAPIClientAndURL("test-key", "USD")
 
 
 	assert.NoError(t, err)
 	assert.NoError(t, err)
-	assert.Equal(t, http.DefaultClient, client)
+	assert.NotNil(t, client)
+	assert.Equal(t, httputil.PricingTimeout, client.Timeout)
 
 
 	parsedURL, err := url.Parse(rawURL)
 	parsedURL, err := url.Parse(rawURL)
 	assert.NoError(t, err)
 	assert.NoError(t, err)

+ 78 - 0
pkg/cloud/httputil/httputil.go

@@ -0,0 +1,78 @@
+// Package httputil provides shared HTTP clients for cloud pricing ingestion.
+//
+// The default net/http client has no timeout, so a hung or unreachable provider
+// pricing endpoint can block pricing refresh indefinitely. These helpers return
+// clients with sensible timeouts. They are shared and reused so a new connection
+// pool is not created on every pricing fetch.
+package httputil
+
+import (
+	"context"
+	"net"
+	"net/http"
+	"time"
+)
+
+// PricingTimeout is applied to pricing HTTP requests. For the bounded client it
+// caps the whole request; for the streaming client it caps the wait for
+// response headers only (not the body read).
+const PricingTimeout = 30 * time.Second
+
+var (
+	boundedClient   = &http.Client{Timeout: PricingTimeout}
+	streamingClient = newStreamingClient(http.DefaultTransport)
+)
+
+// BoundedClient returns a shared http.Client with a total request timeout,
+// suitable for small or bounded pricing API responses.
+func BoundedClient() *http.Client {
+	return boundedClient
+}
+
+// StreamingClient returns a shared http.Client for large pricing downloads (for
+// example the AWS pricing file or the Azure price sheet). It bounds the connect,
+// TLS handshake, and response-header wait, but not the total body read, so a
+// legitimately large or slow download is not truncated while a hung endpoint is
+// still abandoned.
+func StreamingClient() *http.Client {
+	return streamingClient
+}
+
+// StreamingGet issues a GET for a large download using the streaming client and
+// the caller's context, so the request is cancelable. It centralizes the
+// context-aware request construction shared by the large-download paths (the AWS
+// pricing file and the Azure price sheet). Callers without a context of their
+// own can pass context.Background().
+func StreamingGet(ctx context.Context, url string) (*http.Response, error) {
+	req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
+	if err != nil {
+		return nil, err
+	}
+	return StreamingClient().Do(req)
+}
+
+// newStreamingClient builds the streaming client from a base RoundTripper. It
+// takes the base transport as a parameter so the fallback path can be exercised
+// by tests; production passes http.DefaultTransport.
+func newStreamingClient(base http.RoundTripper) *http.Client {
+	// Clone the base transport when possible so we keep its dial and TLS
+	// handshake timeouts. Guard the type assertion: http.DefaultTransport is
+	// declared as a RoundTripper and can be replaced (e.g. in tests), in which
+	// case we fall back to a fresh transport rather than panicking.
+	transport, ok := base.(*http.Transport)
+	if ok {
+		transport = transport.Clone()
+	} else {
+		// base is not a *http.Transport, so we can't clone its timeouts. Build a
+		// fresh transport that still bounds dial and TLS handshake time, matching
+		// the guarantee in StreamingClient's doc.
+		transport = &http.Transport{
+			Proxy:                 http.ProxyFromEnvironment,
+			DialContext:           (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext,
+			TLSHandshakeTimeout:   10 * time.Second,
+			ExpectContinueTimeout: 1 * time.Second,
+		}
+	}
+	transport.ResponseHeaderTimeout = PricingTimeout
+	return &http.Client{Transport: transport}
+}

+ 130 - 0
pkg/cloud/httputil/httputil_test.go

@@ -0,0 +1,130 @@
+package httputil
+
+import (
+	"context"
+	"io"
+	"net/http"
+	"net/http/httptest"
+	"testing"
+)
+
+func TestBoundedClientHasTotalTimeout(t *testing.T) {
+	c := BoundedClient()
+	if c.Timeout != PricingTimeout {
+		t.Fatalf("expected total timeout %v, got %v", PricingTimeout, c.Timeout)
+	}
+}
+
+func TestBoundedClientIsShared(t *testing.T) {
+	if BoundedClient() != BoundedClient() {
+		t.Fatal("expected BoundedClient to return a shared instance")
+	}
+}
+
+func TestStreamingClientHasNoTotalTimeout(t *testing.T) {
+	c := StreamingClient()
+	if c.Timeout != 0 {
+		t.Fatalf("streaming client must not set a total timeout, got %v", c.Timeout)
+	}
+}
+
+func TestStreamingClientHasResponseHeaderTimeout(t *testing.T) {
+	c := StreamingClient()
+	tr, ok := c.Transport.(*http.Transport)
+	if !ok {
+		t.Fatalf("expected *http.Transport, got %T", c.Transport)
+	}
+	if tr.ResponseHeaderTimeout != PricingTimeout {
+		t.Fatalf("expected response-header timeout %v, got %v", PricingTimeout, tr.ResponseHeaderTimeout)
+	}
+}
+
+func TestStreamingClientIsShared(t *testing.T) {
+	if StreamingClient() != StreamingClient() {
+		t.Fatal("expected StreamingClient to return a shared instance")
+	}
+}
+
+type roundTripperFunc func(*http.Request) (*http.Response, error)
+
+func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
+	return f(r)
+}
+
+// A base transport that is not a *http.Transport must not panic and must still
+// yield a usable client with the response-header timeout applied.
+func TestNewStreamingClientFallsBackWhenNotTransport(t *testing.T) {
+	// Honor the RoundTripper contract (non-nil response when error is nil), even
+	// though this base is only used to exercise the fallback and never round-trips.
+	base := roundTripperFunc(func(*http.Request) (*http.Response, error) {
+		return &http.Response{StatusCode: http.StatusOK, Body: http.NoBody}, nil
+	})
+	c := newStreamingClient(base)
+	tr, ok := c.Transport.(*http.Transport)
+	if !ok {
+		t.Fatalf("expected fallback *http.Transport, got %T", c.Transport)
+	}
+	if tr.ResponseHeaderTimeout != PricingTimeout {
+		t.Fatalf("expected response-header timeout %v, got %v", PricingTimeout, tr.ResponseHeaderTimeout)
+	}
+	// The fallback transport must still bound TLS handshake time.
+	if tr.TLSHandshakeTimeout == 0 {
+		t.Fatal("expected fallback transport to set a TLS handshake timeout")
+	}
+}
+
+func TestStreamingGetReturnsBody(t *testing.T) {
+	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
+		_, _ = w.Write([]byte("price-data"))
+	}))
+	defer srv.Close()
+
+	resp, err := StreamingGet(context.Background(), srv.URL)
+	if err != nil {
+		t.Fatalf("unexpected error: %v", err)
+	}
+	defer resp.Body.Close()
+	body, err := io.ReadAll(resp.Body)
+	if err != nil {
+		t.Fatalf("reading body: %v", err)
+	}
+	if string(body) != "price-data" {
+		t.Fatalf("expected body %q, got %q", "price-data", string(body))
+	}
+}
+
+func TestStreamingGetHonorsCanceledContext(t *testing.T) {
+	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
+		_, _ = w.Write([]byte("ok"))
+	}))
+	defer srv.Close()
+
+	ctx, cancel := context.WithCancel(context.Background())
+	cancel() // cancel before the request runs
+
+	if _, err := StreamingGet(ctx, srv.URL); err == nil {
+		t.Fatal("expected an error from a canceled context, got nil")
+	}
+}
+
+func TestStreamingGetRejectsBadURL(t *testing.T) {
+	if _, err := StreamingGet(context.Background(), "://not-a-url"); err == nil {
+		t.Fatal("expected an error building the request, got nil")
+	}
+}
+
+// Cloning must not mutate the shared default transport.
+func TestNewStreamingClientClonesWithoutMutatingDefault(t *testing.T) {
+	def, ok := http.DefaultTransport.(*http.Transport)
+	if !ok {
+		t.Skip("default transport is not *http.Transport in this environment")
+	}
+	c := newStreamingClient(def)
+	tr := c.Transport.(*http.Transport)
+	if tr == def {
+		t.Fatal("expected a cloned transport, got the shared default")
+	}
+	if def.ResponseHeaderTimeout == PricingTimeout {
+		t.Fatal("mutated the shared default transport's ResponseHeaderTimeout")
+	}
+}

+ 5 - 2
pkg/cloud/oracle/ratecard.go

@@ -8,6 +8,7 @@ import (
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
 
 
+	"github.com/opencost/opencost/pkg/cloud/httputil"
 	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/cloud/models"
 )
 )
 
 
@@ -52,8 +53,10 @@ func NewRateCardStore(url, currencyCode string) *RateCardStore {
 	return &RateCardStore{
 	return &RateCardStore{
 		url:          url,
 		url:          url,
 		currencyCode: currencyCode,
 		currencyCode: currencyCode,
-		client:       &http.Client{},
-		prices:       map[string]Price{},
+		// Zero-value http.Client has no timeout; use the shared bounded client so
+		// a stalled rate-card endpoint can't hang ingestion.
+		client: httputil.BoundedClient(),
+		prices: map[string]Price{},
 	}
 	}
 }
 }
 
 

+ 4 - 1
pkg/cloud/otc/pricingapi.go

@@ -7,9 +7,12 @@ import (
 	"net/http"
 	"net/http"
 
 
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/pkg/cloud/httputil"
 )
 )
 
 
-var otcHTTPClient = http.DefaultClient
+// http.DefaultClient has no timeout, so a hung pricing endpoint would block the
+// paginated fetch loop forever. Use the shared bounded client instead.
+var otcHTTPClient = httputil.BoundedClient()
 
 
 // Fetches and flattens all product entries across multiple services with pagination
 // Fetches and flattens all product entries across multiple services with pagination
 func (otc *OTC) fetchPaginatedProducts(serviceNames []string) ([]Product, error) {
 func (otc *OTC) fetchPaginatedProducts(serviceNames []string) ([]Product, error) {

+ 17 - 1
pkg/customcost/ingestor.go

@@ -311,11 +311,27 @@ func (ing *CustomCostIngestor) Status() IngestorStatus {
 		LastRun:     ing.lastRun,
 		LastRun:     ing.lastRun,
 		NextRun:     ing.lastRun.Add(ing.refreshRate).UTC(),
 		NextRun:     ing.lastRun.Add(ing.refreshRate).UTC(),
 		Runs:        ing.runs,
 		Runs:        ing.runs,
-		Coverage:    ing.coverage,
+		Coverage:    ing.copyCoverage(),
 		RefreshRate: ing.refreshRate,
 		RefreshRate: ing.refreshRate,
 	}
 	}
 }
 }
 
 
+// copyCoverage returns a shallow copy of the coverage map taken under the lock.
+// Returning ing.coverage directly hands a live reference to callers (the
+// /customCost/status handler serializes it, which iterates it), racing
+// expandCoverage() writing under coverageLock and risking a fatal "concurrent
+// map iteration and map write" crash. expandCoverage replaces Window values
+// wholesale, so a shallow copy is sufficient.
+func (ing *CustomCostIngestor) copyCoverage() map[string]opencost.Window {
+	ing.coverageLock.Lock()
+	defer ing.coverageLock.Unlock()
+	coverage := make(map[string]opencost.Window, len(ing.coverage))
+	for plugin, window := range ing.coverage {
+		coverage[plugin] = window
+	}
+	return coverage
+}
+
 func (ing *CustomCostIngestor) build(rebuild bool) {
 func (ing *CustomCostIngestor) build(rebuild bool) {
 	defer errors.HandlePanic()
 	defer errors.HandlePanic()
 	e := opencost.RoundBack(time.Now().UTC(), ing.resolution)
 	e := opencost.RoundBack(time.Now().UTC(), ing.resolution)

+ 74 - 0
pkg/customcost/ingestor_test.go

@@ -260,6 +260,80 @@ func TestBuildSingleDomain_DispenseError(t *testing.T) {
 	ingestor.BuildWindow(now.Add(-time.Hour), now)
 	ingestor.BuildWindow(now.Add(-time.Hour), now)
 }
 }
 
 
+// TestIngestor_Status_ReturnsCopyOfCoverage deterministically proves Status()
+// hands back a copy, not the live map: mutating the returned Coverage must not
+// leak into the ingestor's internal state. Unlike the concurrent test below,
+// this fails without needing the race detector.
+func TestIngestor_Status_ReturnsCopyOfCoverage(t *testing.T) {
+	ingestor := &CustomCostIngestor{
+		coverage: map[string]opencost.Window{},
+	}
+	start := time.Now().UTC()
+	end := start.Add(time.Hour)
+	ingestor.expandCoverage(opencost.NewWindow(&start, &end), "plugin-a")
+
+	status := ingestor.Status()
+	if len(status.Coverage) != 1 {
+		t.Fatalf("expected 1 coverage entry, got %d", len(status.Coverage))
+	}
+
+	// Mutating the returned map must not affect the ingestor.
+	status.Coverage["plugin-b"] = opencost.NewWindow(&start, &end)
+	delete(status.Coverage, "plugin-a")
+
+	again := ingestor.Status()
+	if _, ok := again.Coverage["plugin-a"]; !ok {
+		t.Error("plugin-a should remain in the ingestor's coverage; Status() leaked a live reference")
+	}
+	if _, ok := again.Coverage["plugin-b"]; ok {
+		t.Error("plugin-b leaked into the ingestor's coverage; Status() returned a live reference")
+	}
+}
+
+// TestIngestor_Status_ConcurrentWithExpandCoverage guards against a data race:
+// Status() returns the coverage map by reference while expandCoverage() writes
+// to it under coverageLock. The /customCost/status handler serializes the
+// returned map (which iterates it), racing the writer and crashing the process
+// with "concurrent map iteration and map write". Run with -race to detect it.
+func TestIngestor_Status_ConcurrentWithExpandCoverage(t *testing.T) {
+	ingestor := &CustomCostIngestor{
+		coverage: map[string]opencost.Window{},
+	}
+
+	start := time.Now().UTC()
+	end := start.Add(time.Hour)
+	window := opencost.NewWindow(&start, &end)
+
+	var wg sync.WaitGroup
+	wg.Add(2)
+
+	// writer: continuously expands coverage under the lock
+	go func() {
+		defer wg.Done()
+		for i := 0; i < 2000; i++ {
+			ingestor.expandCoverage(window, fmt.Sprintf("plugin-%d", i%16))
+		}
+	}()
+
+	// reader: reads Status() and iterates the returned map, mimicking the JSON
+	// serialization the /customCost/status handler performs on the response
+	go func() {
+		defer wg.Done()
+		for i := 0; i < 2000; i++ {
+			for range ingestor.Status().Coverage {
+			}
+		}
+	}()
+
+	wg.Wait()
+
+	// The writer touched 16 distinct plugins, so coverage should hold 16 entries
+	// once the race-free reads settle.
+	if got := len(ingestor.Status().Coverage); got != 16 {
+		t.Fatalf("expected 16 coverage entries after concurrent writes, got %d", got)
+	}
+}
+
 func TestBuildSingleDomain_ClientError(t *testing.T) {
 func TestBuildSingleDomain_ClientError(t *testing.T) {
 	mock := &mockPluginConnector{
 	mock := &mockPluginConnector{
 		clientErr: fmt.Errorf("connection failed"),
 		clientErr: fmt.Errorf("connection failed"),