Kaynağa Gözat

feat(aws): Use DescribeSpotPriceHistory as fallback for spot pricing

When the AWS spot data feed is delayed (up to 15+ hours in practice),
OpenCost was falling back to on-demand pricing for spot nodes, overstating
costs by ~2.6x. This adds a DescribeSpotPriceHistory API fallback that
provides accurate spot pricing when the spot data feed hasn't yet
delivered data for a running instance.

The fallback is always initialized regardless of whether the spot data
feed is configured, so it also helps clusters that use spot instances
but haven't set up the spot data feed at all.

Fixes https://github.com/opencost/opencost/issues/3725

Signed-off-by: Warwick Peatey <warwick@automatic.systems>
Assisted-by: Claude Code
Warwick Peatey 4 hafta önce
ebeveyn
işleme
5b04729182

+ 124 - 28
pkg/cloud/aws/provider.go

@@ -53,6 +53,7 @@ const (
 
 	APIPricingSource              = "Public API"
 	SpotPricingSource             = "Spot Data Feed"
+	SpotPriceHistorySource        = "Spot Price History"
 	ReservedInstancePricingSource = "Savings Plan, Reserved Instance, and Out-Of-Cluster"
 	FargatePricingSource          = "Fargate"
 
@@ -96,11 +97,7 @@ func (aws *AWS) PricingSourceStatus() map[string]*models.PricingSource {
 		Enabled: true,
 	}
 
-	if !aws.SpotRefreshEnabled() {
-		sps.Available = false
-		sps.Error = "Spot instances not set up"
-		sps.Enabled = false
-	} else {
+	if aws.SpotFeedRefreshEnabled() {
 		sps.Error = ""
 		if aws.SpotPricingError != nil {
 			sps.Error = aws.SpotPricingError.Error()
@@ -112,9 +109,28 @@ func (aws *AWS) PricingSourceStatus() map[string]*models.PricingSource {
 		} else {
 			sps.Error = "No spot instances detected"
 		}
+	} else {
+		sps.Available = false
+		sps.Error = "Spot instances not set up"
+		sps.Enabled = false
 	}
 	sources[SpotPricingSource] = sps
 
+	sphs := &models.PricingSource{
+		Name:    SpotPriceHistorySource,
+		Enabled: true,
+	}
+	if aws.SpotPriceHistoryError != nil {
+		sphs.Error = aws.SpotPriceHistoryError.Error()
+		sphs.Available = false
+	} else if aws.SpotPriceHistoryCache == nil {
+		sphs.Error = "Not yet initialized"
+		sphs.Available = false
+	} else {
+		sphs.Available = true
+	}
+	sources[SpotPriceHistorySource] = sphs
+
 	rps := &models.PricingSource{
 		Name:    ReservedInstancePricingSource,
 		Enabled: true,
@@ -185,6 +201,8 @@ type AWS struct {
 	SpotRefreshRunning          bool
 	SpotPricingLock             sync.RWMutex
 	SpotPricingError            error
+	SpotPriceHistoryCache       *SpotPriceHistoryCache
+	SpotPriceHistoryError       error
 	RIPricingByInstanceID       map[string]*RIData
 	RIPricingError              error
 	RIDataRunning               bool
@@ -848,8 +866,8 @@ func (aws *AWS) getRegionPricing(nodeList []*clustercache.Node) (*http.Response,
 	return resp, pricingURL, err
 }
 
-// SpotRefreshEnabled determines whether the required configs to run the spot feed query have been set up
-func (aws *AWS) SpotRefreshEnabled() bool {
+// SpotFeedRefreshEnabled determines whether the required configs to run the spot feed query have been set up
+func (aws *AWS) SpotFeedRefreshEnabled() bool {
 	// Guard against nil receiver
 	if aws == nil {
 		return false
@@ -1019,28 +1037,36 @@ func (aws *AWS) DownloadPricingData() error {
 	}
 	log.Infof("Finished downloading \"%s\"", pricingURL)
 
-	if !aws.SpotRefreshEnabled() {
-		return nil
+	// Initialize a spot price history cache if not already initialized.
+	// Reset error to allow retrying on subsequent DownloadPricingData calls.
+	if aws.SpotPriceHistoryCache == nil {
+		aws.SpotPriceHistoryError = nil
+		aws.SpotPriceHistoryCache, aws.SpotPriceHistoryError = aws.initializeSpotPriceHistoryCache()
+		if aws.SpotPriceHistoryError != nil {
+			log.Errorf("Failed to initialize spot price history manager: %v", aws.SpotPriceHistoryError)
+		}
 	}
 
-	// Always run spot pricing refresh when performing download
-	aws.refreshSpotPricing(true)
+	if aws.SpotFeedRefreshEnabled() {
+		// Always run spot pricing refresh when performing download
+		aws.refreshSpotPricing(true)
 
-	// Only start a single refresh goroutine
-	if !aws.SpotRefreshRunning {
-		aws.SpotRefreshRunning = true
+		// Only start a single refresh goroutine
+		if !aws.SpotRefreshRunning {
+			aws.SpotRefreshRunning = true
 
-		go func() {
-			defer errs.HandlePanic()
+			go func() {
+				defer errs.HandlePanic()
 
-			for {
-				log.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
-				time.Sleep(SpotRefreshDuration)
+				for {
+					log.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
+					time.Sleep(SpotRefreshDuration)
 
-				// Reoccurring refresh checks update times
-				aws.refreshSpotPricing(false)
-			}
-		}()
+					// Reoccurring refresh checks update times
+					aws.refreshSpotPricing(false)
+				}
+			}()
+		}
 	}
 
 	return nil
@@ -1278,6 +1304,60 @@ func (aws *AWS) refreshSpotPricing(force bool) {
 	aws.SpotPricingByInstanceID = sp
 }
 
+func (aws *AWS) initializeSpotPriceHistoryCache() (*SpotPriceHistoryCache, error) {
+	log.Info("Initializing AWS Spot Price History Manager")
+
+	// Get AWS access key for creating config
+	accessKey, err := aws.GetAWSAccessKey()
+	if err != nil {
+		return nil, fmt.Errorf("getting AWS access key for spot price history: %w", err)
+	}
+
+	// Use the cluster region to create the initial AWS config and credentials.
+	// The SpotPriceHistoryFetcher itself can query multiple regions by creating
+	// region-specific EC2 clients as needed.
+	if aws.ClusterRegion == "" {
+		return nil, fmt.Errorf("no cluster region configured")
+	}
+
+	// Create config for the cluster region
+	awsConfig, err := accessKey.CreateConfig(aws.ClusterRegion)
+	if err != nil {
+		return nil, fmt.Errorf("creating AWS config for spot price history: %w", err)
+	}
+
+	return NewSpotPriceHistoryCache(NewAWSSpotPriceHistoryFetcher(awsConfig)), nil
+}
+
+func (aws *AWS) spotPricingFromHistory(k models.Key) (*SpotPriceHistoryEntry, bool) {
+	if aws.SpotPriceHistoryCache == nil {
+		return nil, false
+	}
+
+	// Extract region, instance type, and availability zone from the key
+	awsKey, ok := k.(*awsKey)
+	if !ok {
+		log.DedupedWarningf(10, "Failed to cast key to awsKey for spot price history lookup: %s", k.ID())
+		return nil, false
+	}
+
+	region, regionOk := util.GetRegion(awsKey.Labels)
+	instanceType, instanceTypeOk := util.GetInstanceType(awsKey.Labels)
+	availabilityZone, availabilityZoneOk := util.GetZone(awsKey.Labels)
+	// Skip lookup if any required information is missing
+	if !regionOk || !instanceTypeOk || !availabilityZoneOk {
+		log.DedupedWarningf(10, "Missing required info for spot price history lookup (region: %s, instanceType: %s, zone: %s): %s", region, instanceType, availabilityZone, k.ID())
+		return nil, false
+	}
+
+	price, err := aws.SpotPriceHistoryCache.GetSpotPrice(region, instanceType, availabilityZone)
+	if err != nil {
+		log.DedupedWarningf(10, "Failed to get spot price history for instance %s: %s", k.ID(), err.Error())
+		return nil, false
+	}
+	return price, true
+}
+
 // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
 func (aws *AWS) NetworkPricing() (*models.Network, error) {
 	cpricing, err := aws.Config.GetCustomPricingData()
@@ -1403,11 +1483,29 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k models.Ke
 			UsageType:    PreemptibleType,
 		}, meta, nil
 	} else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
-		if aws.SpotRefreshEnabled() {
-			log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
+		log.DedupedWarningf(5, "Node %s marked preemptible but no spot feed data available; falling back to other pricing sources", k.ID())
+
+		// Try to get spot pricing from DescribeSpotPriceHistory API
+		if historyEntry, ok := aws.spotPricingFromHistory(k); ok {
+			log.DedupedInfof(5, "Using spot price history data for node %s: $%f", k.ID(), historyEntry.SpotPrice)
+			spotHistoryCost := fmt.Sprintf("%f", historyEntry.SpotPrice)
+			meta.Source = SpotPriceHistorySource
+			return &models.Node{
+				Cost:         spotHistoryCost,
+				VCPU:         terms.VCpu,
+				RAM:          terms.Memory,
+				GPU:          terms.GPU,
+				Storage:      terms.Storage,
+				BaseCPUPrice: aws.BaseCPUPrice,
+				BaseRAMPrice: aws.BaseRAMPrice,
+				BaseGPUPrice: aws.BaseGPUPrice,
+				UsageType:    PreemptibleType,
+			}, meta, nil
 		}
+
 		if publicPricingFound {
 			// return public price if found
+			log.DedupedWarningf(5, "No spot price history available for %s, falling back to on-demand pricing", k.ID())
 			return &models.Node{
 				Cost:         cost,
 				VCPU:         terms.VCpu,
@@ -1421,9 +1519,7 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k models.Ke
 			}, meta, nil
 		} else {
 			// return defaults if public pricing not found
-			if aws.SpotRefreshEnabled() {
-				log.DedupedWarningf(5, "Could not find Node %s's public pricing info, using default configured spot prices instead", k.ID())
-			}
+			log.DedupedWarningf(5, "Could not find Node %s's public pricing info, using default configured spot prices instead", k.ID())
 			return &models.Node{
 				VCPU:         terms.VCpu,
 				VCPUCost:     aws.BaseSpotCPUPrice,

+ 377 - 7
pkg/cloud/aws/provider_test.go

@@ -2,12 +2,14 @@ package aws
 
 import (
 	"encoding/json"
+	"errors"
 	"io"
 	"net/http"
 	"net/url"
 	"os"
 	"reflect"
 	"testing"
+	"time"
 
 	"github.com/opencost/opencost/core/pkg/clustercache"
 	"github.com/opencost/opencost/pkg/cloud/models"
@@ -867,7 +869,7 @@ func (f *fakeProviderConfig) ConfigFileManager() *config.ConfigFileManager {
 	return nil
 }
 
-func TestAWS_SpotRefreshEnabled(t *testing.T) {
+func TestAWS_SpotFeedRefreshEnabled(t *testing.T) {
 	tests := []struct {
 		name                string
 		spotDataBucket      string
@@ -955,9 +957,9 @@ func TestAWS_SpotRefreshEnabled(t *testing.T) {
 				},
 			}
 
-			got := aws.SpotRefreshEnabled()
+			got := aws.SpotFeedRefreshEnabled()
 			if got != tt.want {
-				t.Errorf("AWS.SpotRefreshEnabled() = %v, want %v", got, tt.want)
+				t.Errorf("AWS.SpotFeedRefreshEnabled() = %v, want %v", got, tt.want)
 			}
 		})
 	}
@@ -971,10 +973,10 @@ func TestAWS_SpotRefreshEnabled(t *testing.T) {
 			Config:         nil, // nil Config should not cause panic
 		}
 
-		got := aws.SpotRefreshEnabled()
+		got := aws.SpotFeedRefreshEnabled()
 		want := true // Should fall back to field-based check
 		if got != want {
-			t.Errorf("AWS.SpotRefreshEnabled() with nil Config = %v, want %v", got, want)
+			t.Errorf("AWS.SpotFeedRefreshEnabled() with nil Config = %v, want %v", got, want)
 		}
 	})
 
@@ -986,10 +988,378 @@ func TestAWS_SpotRefreshEnabled(t *testing.T) {
 			Config:         nil, // nil Config should not cause panic
 		}
 
-		got := aws.SpotRefreshEnabled()
+		got := aws.SpotFeedRefreshEnabled()
 		want := false // No fields set, should return false
 		if got != want {
-			t.Errorf("AWS.SpotRefreshEnabled() with nil Config and no fields = %v, want %v", got, want)
+			t.Errorf("AWS.SpotFeedRefreshEnabled() with nil Config and no fields = %v, want %v", got, want)
+		}
+	})
+}
+
+func TestAWS_spotPricingFromHistory(t *testing.T) {
+	t.Run("nil cache returns false", func(t *testing.T) {
+		aws := &AWS{}
+		key := &awsKey{
+			ProviderID: "aws:///us-east-1a/i-0123456789abcdef0",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+				"kubernetes.io/os":                 "linux",
+				"eks.amazonaws.com/capacityType":   "SPOT",
+			},
+		}
+		_, ok := aws.spotPricingFromHistory(key)
+		if ok {
+			t.Error("Expected false when cache is nil")
+		}
+	})
+
+	t.Run("missing region label returns false", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+		}
+		key := &awsKey{
+			ProviderID: "aws:///us-east-1a/i-0123456789abcdef0",
+			Labels: map[string]string{
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+			},
+		}
+		_, ok := aws.spotPricingFromHistory(key)
+		if ok {
+			t.Error("Expected false when region label is missing")
+		}
+	})
+
+	t.Run("missing instance type label returns false", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+		}
+		key := &awsKey{
+			ProviderID: "aws:///us-east-1a/i-0123456789abcdef0",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region": "us-east-1",
+				"topology.kubernetes.io/zone":   "us-east-1a",
+			},
+		}
+		_, ok := aws.spotPricingFromHistory(key)
+		if ok {
+			t.Error("Expected false when instance type label is missing")
+		}
+	})
+
+	t.Run("missing zone label returns false", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+		}
+		key := &awsKey{
+			ProviderID: "aws:///us-east-1a/i-0123456789abcdef0",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"node.kubernetes.io/instance-type": "m5.large",
+			},
+		}
+		_, ok := aws.spotPricingFromHistory(key)
+		if ok {
+			t.Error("Expected false when zone label is missing")
+		}
+	})
+
+	t.Run("fetcher error returns false", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{
+			fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+				return nil, errors.New("api error")
+			},
+		}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+		}
+		key := &awsKey{
+			ProviderID: "aws:///us-east-1a/i-0123456789abcdef0",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+			},
+		}
+		_, ok := aws.spotPricingFromHistory(key)
+		if ok {
+			t.Error("Expected false when fetcher returns error")
+		}
+	})
+
+	t.Run("successful lookup returns entry", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{
+			fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+				if key.Region != "us-east-1" || key.InstanceType != "m5.large" || key.AvailabilityZone != "us-east-1a" {
+					t.Errorf("Unexpected key: %v", key)
+				}
+				return &SpotPriceHistoryEntry{
+					SpotPrice:   0.042,
+					Timestamp:   time.Now(),
+					RetrievedAt: time.Now(),
+				}, nil
+			},
+		}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+		}
+		key := &awsKey{
+			ProviderID: "aws:///us-east-1a/i-0123456789abcdef0",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+			},
+		}
+		entry, ok := aws.spotPricingFromHistory(key)
+		if !ok {
+			t.Fatal("Expected true for successful lookup")
+		}
+		if entry.SpotPrice != 0.042 {
+			t.Errorf("Expected spot price 0.042, got %f", entry.SpotPrice)
+		}
+	})
+}
+
+func TestAWS_createNode_spotHistoryFallback(t *testing.T) {
+	// Helper to build AWSProductTerms with on-demand pricing
+	makeTerms := func(sku, offerTermCode, cost string) *AWSProductTerms {
+		priceKey := sku + "." + offerTermCode + "." + HourlyRateCode
+		return &AWSProductTerms{
+			Sku: sku,
+			OnDemand: &AWSOfferTerm{
+				Sku:           sku,
+				OfferTermCode: offerTermCode,
+				PriceDimensions: map[string]*AWSRateCode{
+					priceKey: {
+						Unit:         "Hrs",
+						PricePerUnit: AWSCurrencyCode{USD: cost},
+					},
+				},
+			},
+			VCpu:   "4",
+			Memory: "16",
+		}
+	}
+
+	t.Run("preemptible node uses spot history when available", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{
+			fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+				return &SpotPriceHistoryEntry{
+					SpotPrice:   0.035,
+					Timestamp:   time.Now(),
+					RetrievedAt: time.Now(),
+				}, nil
+			},
+		}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+			BaseCPUPrice:          "0.04",
+			BaseRAMPrice:          "0.01",
+			BaseGPUPrice:          "0.95",
+		}
+		terms := makeTerms("SKU123", "JRTCKXETXF", "0.096")
+		// Key with PreemptibleType suffix to trigger isPreemptible
+		key := &awsKey{
+			ProviderID:     "aws:///us-east-1a/i-0123456789abcdef0",
+			SpotLabelName:  "eks.amazonaws.com/capacityType",
+			SpotLabelValue: "SPOT",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+				"kubernetes.io/os":                 "linux",
+				"eks.amazonaws.com/capacityType":   "SPOT",
+			},
+		}
+
+		node, meta, err := aws.createNode(terms, PreemptibleType, key)
+		if err != nil {
+			t.Fatalf("Unexpected error: %v", err)
+		}
+		if node.Cost != "0.035000" {
+			t.Errorf("Expected spot history cost 0.035000, got %s", node.Cost)
+		}
+		if node.UsageType != PreemptibleType {
+			t.Errorf("Expected usage type %s, got %s", PreemptibleType, node.UsageType)
+		}
+		if meta.Source != SpotPriceHistorySource {
+			t.Errorf("Expected source %s, got %s", SpotPriceHistorySource, meta.Source)
+		}
+	})
+
+	t.Run("preemptible node falls back to on-demand when history unavailable", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{
+			fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+				return nil, errors.New("no data")
+			},
+		}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+			BaseCPUPrice:          "0.04",
+			BaseRAMPrice:          "0.01",
+			BaseGPUPrice:          "0.95",
+		}
+		terms := makeTerms("SKU123", "JRTCKXETXF", "0.096")
+		key := &awsKey{
+			ProviderID:     "aws:///us-east-1a/i-0123456789abcdef0",
+			SpotLabelName:  "eks.amazonaws.com/capacityType",
+			SpotLabelValue: "SPOT",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+				"kubernetes.io/os":                 "linux",
+				"eks.amazonaws.com/capacityType":   "SPOT",
+			},
+		}
+
+		node, _, err := aws.createNode(terms, PreemptibleType, key)
+		if err != nil {
+			t.Fatalf("Unexpected error: %v", err)
+		}
+		if node.Cost != "0.096" {
+			t.Errorf("Expected on-demand cost 0.096, got %s", node.Cost)
+		}
+		if node.UsageType != PreemptibleType {
+			t.Errorf("Expected usage type %s, got %s", PreemptibleType, node.UsageType)
+		}
+	})
+
+	t.Run("preemptible node with nil cache falls back to on-demand", func(t *testing.T) {
+		aws := &AWS{
+			BaseCPUPrice: "0.04",
+			BaseRAMPrice: "0.01",
+			BaseGPUPrice: "0.95",
+		}
+		terms := makeTerms("SKU123", "JRTCKXETXF", "0.096")
+		key := &awsKey{
+			ProviderID:     "aws:///us-east-1a/i-0123456789abcdef0",
+			SpotLabelName:  "eks.amazonaws.com/capacityType",
+			SpotLabelValue: "SPOT",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+				"kubernetes.io/os":                 "linux",
+				"eks.amazonaws.com/capacityType":   "SPOT",
+			},
+		}
+
+		node, _, err := aws.createNode(terms, PreemptibleType, key)
+		if err != nil {
+			t.Fatalf("Unexpected error: %v", err)
+		}
+		if node.Cost != "0.096" {
+			t.Errorf("Expected on-demand cost 0.096, got %s", node.Cost)
+		}
+	})
+
+	t.Run("preemptible node uses base spot prices when no public pricing", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{
+			fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+				return nil, errors.New("no data")
+			},
+		}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+			BaseCPUPrice:          "0.04",
+			BaseRAMPrice:          "0.01",
+			BaseGPUPrice:          "0.95",
+			BaseSpotCPUPrice:      "0.02",
+			BaseSpotRAMPrice:      "0.005",
+		}
+		// Terms without valid pricing dimensions
+		terms := &AWSProductTerms{
+			Sku: "SKU123",
+			OnDemand: &AWSOfferTerm{
+				Sku:             "SKU123",
+				OfferTermCode:   "JRTCKXETXF",
+				PriceDimensions: map[string]*AWSRateCode{},
+			},
+			VCpu:   "4",
+			Memory: "16",
+		}
+		key := &awsKey{
+			ProviderID:     "aws:///us-east-1a/i-0123456789abcdef0",
+			SpotLabelName:  "eks.amazonaws.com/capacityType",
+			SpotLabelValue: "SPOT",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+				"kubernetes.io/os":                 "linux",
+				"eks.amazonaws.com/capacityType":   "SPOT",
+			},
+		}
+
+		node, _, err := aws.createNode(terms, PreemptibleType, key)
+		if err != nil {
+			t.Fatalf("Unexpected error: %v", err)
+		}
+		if node.VCPUCost != "0.02" {
+			t.Errorf("Expected base spot CPU price 0.02, got %s", node.VCPUCost)
+		}
+		if node.RAMCost != "0.005" {
+			t.Errorf("Expected base spot RAM price 0.005, got %s", node.RAMCost)
+		}
+	})
+}
+
+func TestAWS_PricingSourceStatus_spotPriceHistory(t *testing.T) {
+	t.Run("not yet initialized", func(t *testing.T) {
+		aws := &AWS{
+			Config: &fakeProviderConfig{
+				customPricing: &models.CustomPricing{},
+			},
+		}
+		sources := aws.PricingSourceStatus()
+		sphs, ok := sources[SpotPriceHistorySource]
+		if !ok {
+			t.Fatal("Expected SpotPriceHistorySource in sources")
+		}
+		if sphs.Available {
+			t.Error("Expected Available=false when cache not initialized")
+		}
+		if sphs.Error != "Not yet initialized" {
+			t.Errorf("Expected 'Not yet initialized' error, got %q", sphs.Error)
+		}
+	})
+
+	t.Run("initialization error", func(t *testing.T) {
+		aws := &AWS{
+			SpotPriceHistoryError: errors.New("no cluster region configured"),
+			Config: &fakeProviderConfig{
+				customPricing: &models.CustomPricing{},
+			},
+		}
+		sources := aws.PricingSourceStatus()
+		sphs := sources[SpotPriceHistorySource]
+		if sphs.Available {
+			t.Error("Expected Available=false on error")
+		}
+		if sphs.Error != "no cluster region configured" {
+			t.Errorf("Expected error message, got %q", sphs.Error)
+		}
+	})
+
+	t.Run("successfully initialized", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+			Config: &fakeProviderConfig{
+				customPricing: &models.CustomPricing{},
+			},
+		}
+		sources := aws.PricingSourceStatus()
+		sphs := sources[SpotPriceHistorySource]
+		if !sphs.Available {
+			t.Error("Expected Available=true when cache initialized")
 		}
 	})
 }

+ 200 - 0
pkg/cloud/aws/spotpricehistory.go

@@ -0,0 +1,200 @@
+package aws
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	awsSDK "github.com/aws/aws-sdk-go-v2/aws"
+	"github.com/aws/aws-sdk-go-v2/service/ec2"
+	ec2Types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
+
+	"github.com/opencost/opencost/core/pkg/log"
+)
+
+// SpotPriceHistoryKey uniquely identifies a spot price lookup by region,
+// instance type, and availability zone.
+type SpotPriceHistoryKey struct {
+	Region           string
+	InstanceType     string
+	AvailabilityZone string
+}
+
+func (key SpotPriceHistoryKey) String() string {
+	return fmt.Sprintf("%s/%s/%s", key.Region, key.InstanceType, key.AvailabilityZone)
+}
+
+const (
+	SpotPriceHistoryCacheAge = 1 * time.Hour
+)
+
+// SpotPriceHistoryEntry holds a cached spot price from the DescribeSpotPriceHistory API.
+type SpotPriceHistoryEntry struct {
+	SpotPrice float64
+	Timestamp time.Time
+
+	RetrievedAt time.Time
+	Error       error // Negative cache
+}
+
+func (spe SpotPriceHistoryEntry) shouldRefresh() bool {
+	return time.Since(spe.RetrievedAt) > SpotPriceHistoryCacheAge
+}
+
+// SpotPriceHistoryCache provides a thread-safe, on-demand cache for spot prices
+// retrieved via the DescribeSpotPriceHistory API. Entries are cached for
+// SpotPriceHistoryCacheAge and include negative caching for errors.
+type SpotPriceHistoryCache struct {
+	cache          map[SpotPriceHistoryKey]*SpotPriceHistoryEntry
+	mutex          sync.Mutex
+	refreshRunning map[SpotPriceHistoryKey]bool
+	refreshCond    *sync.Cond
+
+	fetcher SpotPriceHistoryFetcher
+}
+
+func NewSpotPriceHistoryCache(fetcher SpotPriceHistoryFetcher) *SpotPriceHistoryCache {
+	cache := &SpotPriceHistoryCache{
+		cache:          make(map[SpotPriceHistoryKey]*SpotPriceHistoryEntry),
+		refreshRunning: make(map[SpotPriceHistoryKey]bool),
+
+		fetcher: fetcher,
+	}
+	cache.refreshCond = sync.NewCond(&cache.mutex)
+	return cache
+}
+
+// GetSpotPrice returns the cached spot price for the given region, instance type,
+// and availability zone. If the cache entry is missing or stale, it fetches a
+// fresh value from the underlying SpotPriceHistoryFetcher.
+func (sph *SpotPriceHistoryCache) GetSpotPrice(region, instanceType, availabilityZone string) (*SpotPriceHistoryEntry, error) {
+	key := SpotPriceHistoryKey{
+		Region:           region,
+		InstanceType:     instanceType,
+		AvailabilityZone: availabilityZone,
+	}
+	sph.mutex.Lock()
+	for sph.refreshRunning[key] {
+		sph.refreshCond.Wait()
+	}
+	// Check if we have cached price. If so, return it.
+	entry, exists := sph.cache[key]
+	if exists && !entry.shouldRefresh() {
+		sph.mutex.Unlock()
+		return entry, entry.Error
+	}
+	// Either a cache entry does not exist or it is stale. Refresh it.
+	sph.refreshRunning[key] = true
+	sph.mutex.Unlock()
+
+	// Ensure refreshRunning is always cleared, even if the fetcher panics.
+	defer func() {
+		sph.mutex.Lock()
+		delete(sph.refreshRunning, key)
+		sph.refreshCond.Broadcast()
+		sph.mutex.Unlock()
+	}()
+
+	// Fetch the entry
+	entry, err := sph.fetcher.FetchSpotPrice(key)
+	if err != nil || entry == nil {
+		// If we fail to fetch or get a nil entry, create a negative cache entry.
+		if err == nil {
+			err = fmt.Errorf("fetcher returned nil entry for %s", key)
+		}
+		entry = &SpotPriceHistoryEntry{
+			RetrievedAt: time.Now(),
+			Error:       err,
+		}
+	} else {
+		// Normalize cache metadata so cache freshness does not depend on
+		// the fetcher setting these fields correctly.
+		entry.RetrievedAt = time.Now()
+		entry.Error = nil
+	}
+
+	// Store it into the cache
+	sph.mutex.Lock()
+	sph.cache[key] = entry
+	sph.mutex.Unlock()
+	return entry, entry.Error
+}
+
+// SpotPriceHistoryFetcher is the interface for fetching spot prices from the
+// DescribeSpotPriceHistory API (or a mock for testing).
+type SpotPriceHistoryFetcher interface {
+	FetchSpotPrice(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error)
+}
+
+// AWSSpotPriceHistoryFetcher implements SpotPriceHistoryFetcher using the real
+// AWS EC2 DescribeSpotPriceHistory API. It maintains a pool of per-region
+// EC2 clients.
+type AWSSpotPriceHistoryFetcher struct {
+	awsConfig       awsSDK.Config
+	ec2ClientsMutex sync.Mutex
+	ec2Clients      map[string]*ec2.Client
+}
+
+func NewAWSSpotPriceHistoryFetcher(awsConfig awsSDK.Config) *AWSSpotPriceHistoryFetcher {
+	return &AWSSpotPriceHistoryFetcher{
+		awsConfig:  awsConfig,
+		ec2Clients: make(map[string]*ec2.Client),
+	}
+}
+
+func (a *AWSSpotPriceHistoryFetcher) getEC2Client(region string) *ec2.Client {
+	a.ec2ClientsMutex.Lock()
+	defer a.ec2ClientsMutex.Unlock()
+	if client, ok := a.ec2Clients[region]; ok {
+		return client
+	}
+	config := a.awsConfig
+	config.Region = region
+	client := ec2.NewFromConfig(config)
+	a.ec2Clients[region] = client
+	return client
+}
+
+func (a *AWSSpotPriceHistoryFetcher) FetchSpotPrice(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+	log.Debugf("Retrieving spot price history for %s", key)
+	client := a.getEC2Client(key.Region)
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancel()
+
+	input := &ec2.DescribeSpotPriceHistoryInput{
+		InstanceTypes:    []ec2Types.InstanceType{ec2Types.InstanceType(key.InstanceType)},
+		AvailabilityZone: awsSDK.String(key.AvailabilityZone),
+		// Only retrieve Linux/UNIX (Amazon VPC) prices. The non-VPC
+		// "Linux/UNIX" variant was for EC2-Classic, which was fully retired in
+		// August 2023.
+		ProductDescriptions: []string{
+			"Linux/UNIX (Amazon VPC)",
+		},
+		// Only retrieve the latest price.
+		MaxResults: awsSDK.Int32(1),
+	}
+
+	resp, err := client.DescribeSpotPriceHistory(ctx, input)
+	if err != nil {
+		return nil, fmt.Errorf("describing spot price history for %s: %w", key, err)
+	}
+	if len(resp.SpotPriceHistory) == 0 {
+		return nil, fmt.Errorf("no spot price history found for %s", key)
+	}
+	spotPrice := resp.SpotPriceHistory[0]
+
+	if spotPrice.SpotPrice == nil || spotPrice.Timestamp == nil {
+		return nil, fmt.Errorf("missing required spot price history data for %s (SpotPrice=%v, Timestamp=%v)", key, spotPrice.SpotPrice, spotPrice.Timestamp)
+	}
+	price, err := strconv.ParseFloat(*spotPrice.SpotPrice, 64)
+	if err != nil {
+		return nil, fmt.Errorf("parsing spot price: %w", err)
+	}
+	return &SpotPriceHistoryEntry{
+		SpotPrice:   price,
+		Timestamp:   *spotPrice.Timestamp,
+		RetrievedAt: time.Now(),
+	}, nil
+}

+ 239 - 0
pkg/cloud/aws/spotpricehistory_test.go

@@ -0,0 +1,239 @@
+package aws
+
+import (
+	"errors"
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+)
+
+type mockSpotPriceHistoryFetcher struct {
+	fetchFunc func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error)
+}
+
+func (m *mockSpotPriceHistoryFetcher) FetchSpotPrice(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+	if m.fetchFunc != nil {
+		return m.fetchFunc(key)
+	}
+	return &SpotPriceHistoryEntry{
+		SpotPrice:   0.05,
+		Timestamp:   time.Now(),
+		RetrievedAt: time.Now(),
+	}, nil
+}
+
+func TestSpotPriceHistoryCache_GetSpotPrice_CacheHit(t *testing.T) {
+	mockFetcher := &mockSpotPriceHistoryFetcher{}
+	cache := NewSpotPriceHistoryCache(mockFetcher)
+
+	region := "us-west-2"
+	instanceType := "m5.large"
+	availabilityZone := "us-west-2a"
+
+	key := SpotPriceHistoryKey{
+		Region:           region,
+		InstanceType:     instanceType,
+		AvailabilityZone: availabilityZone,
+	}
+
+	cachedEntry := &SpotPriceHistoryEntry{
+		SpotPrice:   0.08,
+		Timestamp:   time.Now(),
+		RetrievedAt: time.Now(),
+	}
+	cache.cache[key] = cachedEntry
+
+	entry, err := cache.GetSpotPrice(region, instanceType, availabilityZone)
+	if err != nil {
+		t.Errorf("Expected no error, got %v", err)
+	}
+	if entry.SpotPrice != 0.08 {
+		t.Errorf("Expected spot price 0.08, got %f", entry.SpotPrice)
+	}
+}
+
+func TestSpotPriceHistoryCache_GetSpotPrice_CacheMiss(t *testing.T) {
+	fetchCalled := false
+	mockFetcher := &mockSpotPriceHistoryFetcher{
+		fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+			fetchCalled = true
+			return &SpotPriceHistoryEntry{
+				SpotPrice:   0.12,
+				Timestamp:   time.Now(),
+				RetrievedAt: time.Now(),
+			}, nil
+		},
+	}
+	cache := NewSpotPriceHistoryCache(mockFetcher)
+
+	entry, err := cache.GetSpotPrice("us-west-2", "m5.large", "us-west-2a")
+	if err != nil {
+		t.Errorf("Expected no error, got %v", err)
+	}
+	if !fetchCalled {
+		t.Error("Expected fetcher to be called for cache miss")
+	}
+	if entry.SpotPrice != 0.12 {
+		t.Errorf("Expected spot price 0.12, got %f", entry.SpotPrice)
+	}
+}
+
+func TestSpotPriceHistoryCache_GetSpotPrice_ConcurrentSameKey(t *testing.T) {
+	var fetchCount atomic.Int32
+	mockFetcher := &mockSpotPriceHistoryFetcher{
+		fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+			fetchCount.Add(1)
+			// Simulate slow API call to increase chance of concurrent access
+			time.Sleep(50 * time.Millisecond)
+			return &SpotPriceHistoryEntry{
+				SpotPrice:   0.07,
+				Timestamp:   time.Now(),
+				RetrievedAt: time.Now(),
+			}, nil
+		},
+	}
+	cache := NewSpotPriceHistoryCache(mockFetcher)
+
+	const goroutines = 10
+	var wg sync.WaitGroup
+	wg.Add(goroutines)
+	for i := 0; i < goroutines; i++ {
+		go func() {
+			defer wg.Done()
+			entry, err := cache.GetSpotPrice("us-west-2", "m5.large", "us-west-2a")
+			if err != nil {
+				t.Errorf("Expected no error, got %v", err)
+			}
+			if entry.SpotPrice != 0.07 {
+				t.Errorf("Expected spot price 0.07, got %f", entry.SpotPrice)
+			}
+		}()
+	}
+	wg.Wait()
+
+	if count := fetchCount.Load(); count != 1 {
+		t.Errorf("Expected exactly 1 fetch call, got %d", count)
+	}
+}
+
+func TestSpotPriceHistoryCache_GetSpotPrice_StaleEntry(t *testing.T) {
+	fetchCalled := false
+	mockFetcher := &mockSpotPriceHistoryFetcher{
+		fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+			fetchCalled = true
+			return &SpotPriceHistoryEntry{
+				SpotPrice:   0.15,
+				Timestamp:   time.Now(),
+				RetrievedAt: time.Now(),
+			}, nil
+		},
+	}
+	cache := NewSpotPriceHistoryCache(mockFetcher)
+
+	key := SpotPriceHistoryKey{
+		Region:           "us-west-2",
+		InstanceType:     "m5.large",
+		AvailabilityZone: "us-west-2a",
+	}
+
+	staleEntry := &SpotPriceHistoryEntry{
+		SpotPrice:   0.08,
+		Timestamp:   time.Now(),
+		RetrievedAt: time.Now().Add(-2 * time.Hour),
+	}
+	cache.cache[key] = staleEntry
+
+	entry, err := cache.GetSpotPrice("us-west-2", "m5.large", "us-west-2a")
+	if err != nil {
+		t.Errorf("Expected no error, got %v", err)
+	}
+	if !fetchCalled {
+		t.Error("Expected fetcher to be called for stale entry")
+	}
+	if entry.SpotPrice != 0.15 {
+		t.Errorf("Expected refreshed spot price 0.15, got %f", entry.SpotPrice)
+	}
+}
+
+func TestSpotPriceHistoryCache_GetSpotPrice_FetchError(t *testing.T) {
+	expectedError := errors.New("fetch failed")
+	mockFetcher := &mockSpotPriceHistoryFetcher{
+		fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+			return nil, expectedError
+		},
+	}
+	cache := NewSpotPriceHistoryCache(mockFetcher)
+
+	_, err := cache.GetSpotPrice("us-west-2", "m5.large", "us-west-2a")
+	if err == nil {
+		t.Error("Expected error from failed fetch")
+	}
+	if !errors.Is(err, expectedError) {
+		t.Errorf("Expected error %v, got %v", expectedError, err)
+	}
+
+	key := SpotPriceHistoryKey{
+		Region:           "us-west-2",
+		InstanceType:     "m5.large",
+		AvailabilityZone: "us-west-2a",
+	}
+	cachedEntry := cache.cache[key]
+	if !errors.Is(cachedEntry.Error, expectedError) {
+		t.Errorf("Expected cached entry error %v, got %v", expectedError, cachedEntry.Error)
+	}
+}
+
+func TestSpotPriceHistoryEntry_shouldRefresh(t *testing.T) {
+	now := time.Now()
+
+	tests := []struct {
+		name        string
+		retrievedAt time.Time
+		expected    bool
+	}{
+		{
+			name:        "fresh entry",
+			retrievedAt: now,
+			expected:    false,
+		},
+		{
+			name:        "stale entry",
+			retrievedAt: now.Add(-2 * time.Hour),
+			expected:    true,
+		},
+		{
+			name:        "borderline entry",
+			retrievedAt: now.Add(-SpotPriceHistoryCacheAge + 1*time.Minute),
+			expected:    false,
+		},
+		{
+			name:        "expired entry",
+			retrievedAt: now.Add(-SpotPriceHistoryCacheAge - 1*time.Minute),
+			expected:    true,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			entry := SpotPriceHistoryEntry{
+				RetrievedAt: tt.retrievedAt,
+			}
+			if got := entry.shouldRefresh(); got != tt.expected {
+				t.Errorf("shouldRefresh() = %v, want %v", got, tt.expected)
+			}
+		})
+	}
+}
+
+func TestSpotPriceHistoryKey_String(t *testing.T) {
+	key := SpotPriceHistoryKey{
+		Region:           "us-west-2",
+		InstanceType:     "m5.large",
+		AvailabilityZone: "us-west-2a",
+	}
+	expected := "us-west-2/m5.large/us-west-2a"
+	if got := key.String(); got != expected {
+		t.Errorf("String() = %v, want %v", got, expected)
+	}
+}