Browse Source

Add AWS Spot Price History API caching layer (#3731)

Signed-off-by: Warwick Peatey <warwick@automatic.systems>
Warwick 4 tuần trước cách đây
mục cha
commit
ef18db763d

+ 124 - 28
pkg/cloud/aws/provider.go

@@ -53,6 +53,7 @@ const (
 
 	APIPricingSource              = "Public API"
 	SpotPricingSource             = "Spot Data Feed"
+	SpotPriceHistorySource        = "Spot Price History"
 	ReservedInstancePricingSource = "Savings Plan, Reserved Instance, and Out-Of-Cluster"
 	FargatePricingSource          = "Fargate"
 
@@ -96,11 +97,7 @@ func (aws *AWS) PricingSourceStatus() map[string]*models.PricingSource {
 		Enabled: true,
 	}
 
-	if !aws.SpotRefreshEnabled() {
-		sps.Available = false
-		sps.Error = "Spot instances not set up"
-		sps.Enabled = false
-	} else {
+	if aws.SpotFeedRefreshEnabled() {
 		sps.Error = ""
 		if aws.SpotPricingError != nil {
 			sps.Error = aws.SpotPricingError.Error()
@@ -112,9 +109,28 @@ func (aws *AWS) PricingSourceStatus() map[string]*models.PricingSource {
 		} else {
 			sps.Error = "No spot instances detected"
 		}
+	} else {
+		sps.Available = false
+		sps.Error = "Spot instances not set up"
+		sps.Enabled = false
 	}
 	sources[SpotPricingSource] = sps
 
+	sphs := &models.PricingSource{
+		Name:    SpotPriceHistorySource,
+		Enabled: true,
+	}
+	if aws.SpotPriceHistoryError != nil {
+		sphs.Error = aws.SpotPriceHistoryError.Error()
+		sphs.Available = false
+	} else if aws.SpotPriceHistoryCache == nil {
+		sphs.Error = "Not yet initialized"
+		sphs.Available = false
+	} else {
+		sphs.Available = true
+	}
+	sources[SpotPriceHistorySource] = sphs
+
 	rps := &models.PricingSource{
 		Name:    ReservedInstancePricingSource,
 		Enabled: true,
@@ -185,6 +201,8 @@ type AWS struct {
 	SpotRefreshRunning          bool
 	SpotPricingLock             sync.RWMutex
 	SpotPricingError            error
+	SpotPriceHistoryCache       *SpotPriceHistoryCache
+	SpotPriceHistoryError       error
 	RIPricingByInstanceID       map[string]*RIData
 	RIPricingError              error
 	RIDataRunning               bool
@@ -848,8 +866,8 @@ func (aws *AWS) getRegionPricing(nodeList []*clustercache.Node) (*http.Response,
 	return resp, pricingURL, err
 }
 
-// SpotRefreshEnabled determines whether the required configs to run the spot feed query have been set up
-func (aws *AWS) SpotRefreshEnabled() bool {
+// SpotFeedRefreshEnabled determines whether the required configs to run the spot feed query have been set up
+func (aws *AWS) SpotFeedRefreshEnabled() bool {
 	// Guard against nil receiver
 	if aws == nil {
 		return false
@@ -1019,28 +1037,36 @@ func (aws *AWS) DownloadPricingData() error {
 	}
 	log.Infof("Finished downloading \"%s\"", pricingURL)
 
-	if !aws.SpotRefreshEnabled() {
-		return nil
+	// Initialize a spot price history cache if not already initialized.
+	// Reset error to allow retrying on subsequent DownloadPricingData calls.
+	if aws.SpotPriceHistoryCache == nil {
+		aws.SpotPriceHistoryError = nil
+		aws.SpotPriceHistoryCache, aws.SpotPriceHistoryError = aws.initializeSpotPriceHistoryCache()
+		if aws.SpotPriceHistoryError != nil {
+			log.Errorf("Failed to initialize spot price history manager: %v", aws.SpotPriceHistoryError)
+		}
 	}
 
-	// Always run spot pricing refresh when performing download
-	aws.refreshSpotPricing(true)
+	if aws.SpotFeedRefreshEnabled() {
+		// Always run spot pricing refresh when performing download
+		aws.refreshSpotPricing(true)
 
-	// Only start a single refresh goroutine
-	if !aws.SpotRefreshRunning {
-		aws.SpotRefreshRunning = true
+		// Only start a single refresh goroutine
+		if !aws.SpotRefreshRunning {
+			aws.SpotRefreshRunning = true
 
-		go func() {
-			defer errs.HandlePanic()
+			go func() {
+				defer errs.HandlePanic()
 
-			for {
-				log.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
-				time.Sleep(SpotRefreshDuration)
+				for {
+					log.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
+					time.Sleep(SpotRefreshDuration)
 
-				// Reoccurring refresh checks update times
-				aws.refreshSpotPricing(false)
-			}
-		}()
+					// Reoccurring refresh checks update times
+					aws.refreshSpotPricing(false)
+				}
+			}()
+		}
 	}
 
 	return nil
@@ -1278,6 +1304,60 @@ func (aws *AWS) refreshSpotPricing(force bool) {
 	aws.SpotPricingByInstanceID = sp
 }
 
+func (aws *AWS) initializeSpotPriceHistoryCache() (*SpotPriceHistoryCache, error) {
+	log.Info("Initializing AWS Spot Price History Manager")
+
+	// Get AWS access key for creating config
+	accessKey, err := aws.GetAWSAccessKey()
+	if err != nil {
+		return nil, fmt.Errorf("getting AWS access key for spot price history: %w", err)
+	}
+
+	// Use the cluster region to create the initial AWS config and credentials.
+	// The SpotPriceHistoryFetcher itself can query multiple regions by creating
+	// region-specific EC2 clients as needed.
+	if aws.ClusterRegion == "" {
+		return nil, fmt.Errorf("no cluster region configured")
+	}
+
+	// Create config for the cluster region
+	awsConfig, err := accessKey.CreateConfig(aws.ClusterRegion)
+	if err != nil {
+		return nil, fmt.Errorf("creating AWS config for spot price history: %w", err)
+	}
+
+	return NewSpotPriceHistoryCache(NewAWSSpotPriceHistoryFetcher(awsConfig)), nil
+}
+
+func (aws *AWS) spotPricingFromHistory(k models.Key) (*SpotPriceHistoryEntry, bool) {
+	if aws.SpotPriceHistoryCache == nil {
+		return nil, false
+	}
+
+	// Extract region, instance type, and availability zone from the key
+	awsKey, ok := k.(*awsKey)
+	if !ok {
+		log.DedupedWarningf(10, "Failed to cast key to awsKey for spot price history lookup: %s", k.ID())
+		return nil, false
+	}
+
+	region, regionOk := util.GetRegion(awsKey.Labels)
+	instanceType, instanceTypeOk := util.GetInstanceType(awsKey.Labels)
+	availabilityZone, availabilityZoneOk := util.GetZone(awsKey.Labels)
+	// Skip lookup if any required information is missing
+	if !regionOk || !instanceTypeOk || !availabilityZoneOk {
+		log.DedupedWarningf(10, "Missing required info for spot price history lookup (region: %s, instanceType: %s, zone: %s): %s", region, instanceType, availabilityZone, k.ID())
+		return nil, false
+	}
+
+	price, err := aws.SpotPriceHistoryCache.GetSpotPrice(region, instanceType, availabilityZone)
+	if err != nil {
+		log.DedupedWarningf(10, "Failed to get spot price history for instance %s: %s", k.ID(), err.Error())
+		return nil, false
+	}
+	return price, true
+}
+
 // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
 func (aws *AWS) NetworkPricing() (*models.Network, error) {
 	cpricing, err := aws.Config.GetCustomPricingData()
@@ -1403,11 +1483,29 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k models.Ke
 			UsageType:    PreemptibleType,
 		}, meta, nil
 	} else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
-		if aws.SpotRefreshEnabled() {
-			log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
+		log.DedupedWarningf(5, "Node %s marked preemptible but no spot feed data available; falling back to other pricing sources", k.ID())
+
+		// Try to get spot pricing from DescribeSpotPriceHistory API
+		if historyEntry, ok := aws.spotPricingFromHistory(k); ok {
+			log.DedupedInfof(5, "Using spot price history data for node %s: $%f", k.ID(), historyEntry.SpotPrice)
+			spotHistoryCost := fmt.Sprintf("%f", historyEntry.SpotPrice)
+			meta.Source = SpotPriceHistorySource
+			return &models.Node{
+				Cost:         spotHistoryCost,
+				VCPU:         terms.VCpu,
+				RAM:          terms.Memory,
+				GPU:          terms.GPU,
+				Storage:      terms.Storage,
+				BaseCPUPrice: aws.BaseCPUPrice,
+				BaseRAMPrice: aws.BaseRAMPrice,
+				BaseGPUPrice: aws.BaseGPUPrice,
+				UsageType:    PreemptibleType,
+			}, meta, nil
 		}
+
 		if publicPricingFound {
 			// return public price if found
+			log.DedupedWarningf(5, "No spot price history available for %s, falling back to on-demand pricing", k.ID())
 			return &models.Node{
 				Cost:         cost,
 				VCPU:         terms.VCpu,
@@ -1421,9 +1519,7 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k models.Ke
 			}, meta, nil
 		} else {
 			// return defaults if public pricing not found
-			if aws.SpotRefreshEnabled() {
-				log.DedupedWarningf(5, "Could not find Node %s's public pricing info, using default configured spot prices instead", k.ID())
-			}
+			log.DedupedWarningf(5, "Could not find Node %s's public pricing info, using default configured spot prices instead", k.ID())
 			return &models.Node{
 				VCPU:         terms.VCpu,
 				VCPUCost:     aws.BaseSpotCPUPrice,

+ 377 - 7
pkg/cloud/aws/provider_test.go

@@ -2,12 +2,14 @@ package aws
 
 import (
 	"encoding/json"
+	"errors"
 	"io"
 	"net/http"
 	"net/url"
 	"os"
 	"reflect"
 	"testing"
+	"time"
 
 	"github.com/opencost/opencost/core/pkg/clustercache"
 	"github.com/opencost/opencost/pkg/cloud/models"
@@ -867,7 +869,7 @@ func (f *fakeProviderConfig) ConfigFileManager() *config.ConfigFileManager {
 	return nil
 }
 
-func TestAWS_SpotRefreshEnabled(t *testing.T) {
+func TestAWS_SpotFeedRefreshEnabled(t *testing.T) {
 	tests := []struct {
 		name                string
 		spotDataBucket      string
@@ -955,9 +957,9 @@ func TestAWS_SpotRefreshEnabled(t *testing.T) {
 				},
 			}
 
-			got := aws.SpotRefreshEnabled()
+			got := aws.SpotFeedRefreshEnabled()
 			if got != tt.want {
-				t.Errorf("AWS.SpotRefreshEnabled() = %v, want %v", got, tt.want)
+				t.Errorf("AWS.SpotFeedRefreshEnabled() = %v, want %v", got, tt.want)
 			}
 		})
 	}
@@ -971,10 +973,10 @@ func TestAWS_SpotRefreshEnabled(t *testing.T) {
 			Config:         nil, // nil Config should not cause panic
 		}
 
-		got := aws.SpotRefreshEnabled()
+		got := aws.SpotFeedRefreshEnabled()
 		want := true // Should fall back to field-based check
 		if got != want {
-			t.Errorf("AWS.SpotRefreshEnabled() with nil Config = %v, want %v", got, want)
+			t.Errorf("AWS.SpotFeedRefreshEnabled() with nil Config = %v, want %v", got, want)
 		}
 	})
 
@@ -986,10 +988,378 @@ func TestAWS_SpotRefreshEnabled(t *testing.T) {
 			Config:         nil, // nil Config should not cause panic
 		}
 
-		got := aws.SpotRefreshEnabled()
+		got := aws.SpotFeedRefreshEnabled()
 		want := false // No fields set, should return false
 		if got != want {
-			t.Errorf("AWS.SpotRefreshEnabled() with nil Config and no fields = %v, want %v", got, want)
+			t.Errorf("AWS.SpotFeedRefreshEnabled() with nil Config and no fields = %v, want %v", got, want)
+		}
+	})
+}
+
+func TestAWS_spotPricingFromHistory(t *testing.T) {
+	t.Run("nil cache returns false", func(t *testing.T) {
+		aws := &AWS{}
+		key := &awsKey{
+			ProviderID: "aws:///us-east-1a/i-0123456789abcdef0",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+				"kubernetes.io/os":                 "linux",
+				"eks.amazonaws.com/capacityType":   "SPOT",
+			},
+		}
+		_, ok := aws.spotPricingFromHistory(key)
+		if ok {
+			t.Error("Expected false when cache is nil")
+		}
+	})
+
+	t.Run("missing region label returns false", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+		}
+		key := &awsKey{
+			ProviderID: "aws:///us-east-1a/i-0123456789abcdef0",
+			Labels: map[string]string{
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+			},
+		}
+		_, ok := aws.spotPricingFromHistory(key)
+		if ok {
+			t.Error("Expected false when region label is missing")
+		}
+	})
+
+	t.Run("missing instance type label returns false", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+		}
+		key := &awsKey{
+			ProviderID: "aws:///us-east-1a/i-0123456789abcdef0",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region": "us-east-1",
+				"topology.kubernetes.io/zone":   "us-east-1a",
+			},
+		}
+		_, ok := aws.spotPricingFromHistory(key)
+		if ok {
+			t.Error("Expected false when instance type label is missing")
+		}
+	})
+
+	t.Run("missing zone label returns false", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+		}
+		key := &awsKey{
+			ProviderID: "aws:///us-east-1a/i-0123456789abcdef0",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"node.kubernetes.io/instance-type": "m5.large",
+			},
+		}
+		_, ok := aws.spotPricingFromHistory(key)
+		if ok {
+			t.Error("Expected false when zone label is missing")
+		}
+	})
+
+	t.Run("fetcher error returns false", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{
+			fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+				return nil, errors.New("api error")
+			},
+		}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+		}
+		key := &awsKey{
+			ProviderID: "aws:///us-east-1a/i-0123456789abcdef0",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+			},
+		}
+		_, ok := aws.spotPricingFromHistory(key)
+		if ok {
+			t.Error("Expected false when fetcher returns error")
+		}
+	})
+
+	t.Run("successful lookup returns entry", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{
+			fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+				if key.Region != "us-east-1" || key.InstanceType != "m5.large" || key.AvailabilityZone != "us-east-1a" {
+					t.Errorf("Unexpected key: %v", key)
+				}
+				return &SpotPriceHistoryEntry{
+					SpotPrice:   0.042,
+					Timestamp:   time.Now(),
+					RetrievedAt: time.Now(),
+				}, nil
+			},
+		}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+		}
+		key := &awsKey{
+			ProviderID: "aws:///us-east-1a/i-0123456789abcdef0",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+			},
+		}
+		entry, ok := aws.spotPricingFromHistory(key)
+		if !ok {
+			t.Fatal("Expected true for successful lookup")
+		}
+		if entry.SpotPrice != 0.042 {
+			t.Errorf("Expected spot price 0.042, got %f", entry.SpotPrice)
+		}
+	})
+}
+
+func TestAWS_createNode_spotHistoryFallback(t *testing.T) {
+	// Helper to build AWSProductTerms with on-demand pricing
+	makeTerms := func(sku, offerTermCode, cost string) *AWSProductTerms {
+		priceKey := sku + "." + offerTermCode + "." + HourlyRateCode
+		return &AWSProductTerms{
+			Sku: sku,
+			OnDemand: &AWSOfferTerm{
+				Sku:           sku,
+				OfferTermCode: offerTermCode,
+				PriceDimensions: map[string]*AWSRateCode{
+					priceKey: {
+						Unit:         "Hrs",
+						PricePerUnit: AWSCurrencyCode{USD: cost},
+					},
+				},
+			},
+			VCpu:   "4",
+			Memory: "16",
+		}
+	}
+
+	t.Run("preemptible node uses spot history when available", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{
+			fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+				return &SpotPriceHistoryEntry{
+					SpotPrice:   0.035,
+					Timestamp:   time.Now(),
+					RetrievedAt: time.Now(),
+				}, nil
+			},
+		}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+			BaseCPUPrice:          "0.04",
+			BaseRAMPrice:          "0.01",
+			BaseGPUPrice:          "0.95",
+		}
+		terms := makeTerms("SKU123", "JRTCKXETXF", "0.096")
+		// Key with PreemptibleType suffix to trigger isPreemptible
+		key := &awsKey{
+			ProviderID:     "aws:///us-east-1a/i-0123456789abcdef0",
+			SpotLabelName:  "eks.amazonaws.com/capacityType",
+			SpotLabelValue: "SPOT",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+				"kubernetes.io/os":                 "linux",
+				"eks.amazonaws.com/capacityType":   "SPOT",
+			},
+		}
+
+		node, meta, err := aws.createNode(terms, PreemptibleType, key)
+		if err != nil {
+			t.Fatalf("Unexpected error: %v", err)
+		}
+		if node.Cost != "0.035000" {
+			t.Errorf("Expected spot history cost 0.035000, got %s", node.Cost)
+		}
+		if node.UsageType != PreemptibleType {
+			t.Errorf("Expected usage type %s, got %s", PreemptibleType, node.UsageType)
+		}
+		if meta.Source != SpotPriceHistorySource {
+			t.Errorf("Expected source %s, got %s", SpotPriceHistorySource, meta.Source)
+		}
+	})
+
+	t.Run("preemptible node falls back to on-demand when history unavailable", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{
+			fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+				return nil, errors.New("no data")
+			},
+		}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+			BaseCPUPrice:          "0.04",
+			BaseRAMPrice:          "0.01",
+			BaseGPUPrice:          "0.95",
+		}
+		terms := makeTerms("SKU123", "JRTCKXETXF", "0.096")
+		key := &awsKey{
+			ProviderID:     "aws:///us-east-1a/i-0123456789abcdef0",
+			SpotLabelName:  "eks.amazonaws.com/capacityType",
+			SpotLabelValue: "SPOT",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+				"kubernetes.io/os":                 "linux",
+				"eks.amazonaws.com/capacityType":   "SPOT",
+			},
+		}
+
+		node, _, err := aws.createNode(terms, PreemptibleType, key)
+		if err != nil {
+			t.Fatalf("Unexpected error: %v", err)
+		}
+		if node.Cost != "0.096" {
+			t.Errorf("Expected on-demand cost 0.096, got %s", node.Cost)
+		}
+		if node.UsageType != PreemptibleType {
+			t.Errorf("Expected usage type %s, got %s", PreemptibleType, node.UsageType)
+		}
+	})
+
+	t.Run("preemptible node with nil cache falls back to on-demand", func(t *testing.T) {
+		aws := &AWS{
+			BaseCPUPrice: "0.04",
+			BaseRAMPrice: "0.01",
+			BaseGPUPrice: "0.95",
+		}
+		terms := makeTerms("SKU123", "JRTCKXETXF", "0.096")
+		key := &awsKey{
+			ProviderID:     "aws:///us-east-1a/i-0123456789abcdef0",
+			SpotLabelName:  "eks.amazonaws.com/capacityType",
+			SpotLabelValue: "SPOT",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+				"kubernetes.io/os":                 "linux",
+				"eks.amazonaws.com/capacityType":   "SPOT",
+			},
+		}
+
+		node, _, err := aws.createNode(terms, PreemptibleType, key)
+		if err != nil {
+			t.Fatalf("Unexpected error: %v", err)
+		}
+		if node.Cost != "0.096" {
+			t.Errorf("Expected on-demand cost 0.096, got %s", node.Cost)
+		}
+	})
+
+	t.Run("preemptible node uses base spot prices when no public pricing", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{
+			fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+				return nil, errors.New("no data")
+			},
+		}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+			BaseCPUPrice:          "0.04",
+			BaseRAMPrice:          "0.01",
+			BaseGPUPrice:          "0.95",
+			BaseSpotCPUPrice:      "0.02",
+			BaseSpotRAMPrice:      "0.005",
+		}
+		// Terms without valid pricing dimensions
+		terms := &AWSProductTerms{
+			Sku: "SKU123",
+			OnDemand: &AWSOfferTerm{
+				Sku:             "SKU123",
+				OfferTermCode:   "JRTCKXETXF",
+				PriceDimensions: map[string]*AWSRateCode{},
+			},
+			VCpu:   "4",
+			Memory: "16",
+		}
+		key := &awsKey{
+			ProviderID:     "aws:///us-east-1a/i-0123456789abcdef0",
+			SpotLabelName:  "eks.amazonaws.com/capacityType",
+			SpotLabelValue: "SPOT",
+			Labels: map[string]string{
+				"topology.kubernetes.io/region":    "us-east-1",
+				"topology.kubernetes.io/zone":      "us-east-1a",
+				"node.kubernetes.io/instance-type": "m5.large",
+				"kubernetes.io/os":                 "linux",
+				"eks.amazonaws.com/capacityType":   "SPOT",
+			},
+		}
+
+		node, _, err := aws.createNode(terms, PreemptibleType, key)
+		if err != nil {
+			t.Fatalf("Unexpected error: %v", err)
+		}
+		if node.VCPUCost != "0.02" {
+			t.Errorf("Expected base spot CPU price 0.02, got %s", node.VCPUCost)
+		}
+		if node.RAMCost != "0.005" {
+			t.Errorf("Expected base spot RAM price 0.005, got %s", node.RAMCost)
+		}
+	})
+}
+
+func TestAWS_PricingSourceStatus_spotPriceHistory(t *testing.T) {
+	t.Run("not yet initialized", func(t *testing.T) {
+		aws := &AWS{
+			Config: &fakeProviderConfig{
+				customPricing: &models.CustomPricing{},
+			},
+		}
+		sources := aws.PricingSourceStatus()
+		sphs, ok := sources[SpotPriceHistorySource]
+		if !ok {
+			t.Fatal("Expected SpotPriceHistorySource in sources")
+		}
+		if sphs.Available {
+			t.Error("Expected Available=false when cache not initialized")
+		}
+		if sphs.Error != "Not yet initialized" {
+			t.Errorf("Expected 'Not yet initialized' error, got %q", sphs.Error)
+		}
+	})
+
+	t.Run("initialization error", func(t *testing.T) {
+		aws := &AWS{
+			SpotPriceHistoryError: errors.New("no cluster region configured"),
+			Config: &fakeProviderConfig{
+				customPricing: &models.CustomPricing{},
+			},
+		}
+		sources := aws.PricingSourceStatus()
+		sphs := sources[SpotPriceHistorySource]
+		if sphs.Available {
+			t.Error("Expected Available=false on error")
+		}
+		if sphs.Error != "no cluster region configured" {
+			t.Errorf("Expected error message, got %q", sphs.Error)
+		}
+	})
+
+	t.Run("successfully initialized", func(t *testing.T) {
+		mockFetcher := &mockSpotPriceHistoryFetcher{}
+		aws := &AWS{
+			SpotPriceHistoryCache: NewSpotPriceHistoryCache(mockFetcher),
+			Config: &fakeProviderConfig{
+				customPricing: &models.CustomPricing{},
+			},
+		}
+		sources := aws.PricingSourceStatus()
+		sphs := sources[SpotPriceHistorySource]
+		if !sphs.Available {
+			t.Error("Expected Available=true when cache initialized")
 		}
 	})
 }

+ 200 - 0
pkg/cloud/aws/spotpricehistory.go

@@ -0,0 +1,200 @@
+package aws
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	awsSDK "github.com/aws/aws-sdk-go-v2/aws"
+	"github.com/aws/aws-sdk-go-v2/service/ec2"
+	ec2Types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
+
+	"github.com/opencost/opencost/core/pkg/log"
+)
+
+// SpotPriceHistoryKey uniquely identifies a spot price lookup by region,
+// instance type, and availability zone.
+type SpotPriceHistoryKey struct {
+	Region           string
+	InstanceType     string
+	AvailabilityZone string
+}
+
+func (key SpotPriceHistoryKey) String() string {
+	return fmt.Sprintf("%s/%s/%s", key.Region, key.InstanceType, key.AvailabilityZone)
+}
+
+const (
+	SpotPriceHistoryCacheAge = 1 * time.Hour
+)
+
+// SpotPriceHistoryEntry holds a cached spot price from the DescribeSpotPriceHistory API.
+type SpotPriceHistoryEntry struct {
+	SpotPrice float64
+	Timestamp time.Time
+
+	RetrievedAt time.Time
+	Error       error // Negative cache
+}
+
+func (spe SpotPriceHistoryEntry) shouldRefresh() bool {
+	return time.Since(spe.RetrievedAt) > SpotPriceHistoryCacheAge
+}
+
+// SpotPriceHistoryCache provides a thread-safe, on-demand cache for spot prices
+// retrieved via the DescribeSpotPriceHistory API. Entries are cached for
+// SpotPriceHistoryCacheAge and include negative caching for errors.
+type SpotPriceHistoryCache struct {
+	cache          map[SpotPriceHistoryKey]*SpotPriceHistoryEntry
+	mutex          sync.Mutex
+	refreshRunning map[SpotPriceHistoryKey]bool
+	refreshCond    *sync.Cond
+
+	fetcher SpotPriceHistoryFetcher
+}
+
+func NewSpotPriceHistoryCache(fetcher SpotPriceHistoryFetcher) *SpotPriceHistoryCache {
+	cache := &SpotPriceHistoryCache{
+		cache:          make(map[SpotPriceHistoryKey]*SpotPriceHistoryEntry),
+		refreshRunning: make(map[SpotPriceHistoryKey]bool),
+
+		fetcher: fetcher,
+	}
+	cache.refreshCond = sync.NewCond(&cache.mutex)
+	return cache
+}
+
+// GetSpotPrice returns the cached spot price for the given region, instance type,
+// and availability zone. If the cache entry is missing or stale, it fetches a
+// fresh value from the underlying SpotPriceHistoryFetcher.
+func (sph *SpotPriceHistoryCache) GetSpotPrice(region, instanceType, availabilityZone string) (*SpotPriceHistoryEntry, error) {
+	key := SpotPriceHistoryKey{
+		Region:           region,
+		InstanceType:     instanceType,
+		AvailabilityZone: availabilityZone,
+	}
+	sph.mutex.Lock()
+	for sph.refreshRunning[key] {
+		sph.refreshCond.Wait()
+	}
+	// Check if we have cached price. If so, return it.
+	entry, exists := sph.cache[key]
+	if exists && !entry.shouldRefresh() {
+		sph.mutex.Unlock()
+		return entry, entry.Error
+	}
+	// Either a cache entry does not exist or it is stale. Refresh it.
+	sph.refreshRunning[key] = true
+	sph.mutex.Unlock()
+
+	// Ensure refreshRunning is always cleared, even if the fetcher panics.
+	defer func() {
+		sph.mutex.Lock()
+		delete(sph.refreshRunning, key)
+		sph.refreshCond.Broadcast()
+		sph.mutex.Unlock()
+	}()
+
+	// Fetch the entry
+	entry, err := sph.fetcher.FetchSpotPrice(key)
+	if err != nil || entry == nil {
+		// If we fail to fetch or get a nil entry, create a negative cache entry.
+		if err == nil {
+			err = fmt.Errorf("fetcher returned nil entry for %s", key)
+		}
+		entry = &SpotPriceHistoryEntry{
+			RetrievedAt: time.Now(),
+			Error:       err,
+		}
+	} else {
+		// Normalize cache metadata so cache freshness does not depend on
+		// the fetcher setting these fields correctly.
+		entry.RetrievedAt = time.Now()
+		entry.Error = nil
+	}
+
+	// Store it into the cache
+	sph.mutex.Lock()
+	sph.cache[key] = entry
+	sph.mutex.Unlock()
+	return entry, entry.Error
+}
+
+// SpotPriceHistoryFetcher is the interface for fetching spot prices from the
+// DescribeSpotPriceHistory API (or a mock for testing).
+type SpotPriceHistoryFetcher interface {
+	FetchSpotPrice(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error)
+}
+
+// AWSSpotPriceHistoryFetcher implements SpotPriceHistoryFetcher using the real
+// AWS EC2 DescribeSpotPriceHistory API. It maintains a pool of per-region
+// EC2 clients.
+type AWSSpotPriceHistoryFetcher struct {
+	awsConfig       awsSDK.Config
+	ec2ClientsMutex sync.Mutex
+	ec2Clients      map[string]*ec2.Client
+}
+
+func NewAWSSpotPriceHistoryFetcher(awsConfig awsSDK.Config) *AWSSpotPriceHistoryFetcher {
+	return &AWSSpotPriceHistoryFetcher{
+		awsConfig:  awsConfig,
+		ec2Clients: make(map[string]*ec2.Client),
+	}
+}
+
+func (a *AWSSpotPriceHistoryFetcher) getEC2Client(region string) *ec2.Client {
+	a.ec2ClientsMutex.Lock()
+	defer a.ec2ClientsMutex.Unlock()
+	if client, ok := a.ec2Clients[region]; ok {
+		return client
+	}
+	config := a.awsConfig
+	config.Region = region
+	client := ec2.NewFromConfig(config)
+	a.ec2Clients[region] = client
+	return client
+}
+
+func (a *AWSSpotPriceHistoryFetcher) FetchSpotPrice(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+	log.Debugf("Retrieving spot price history for %s", key)
+	client := a.getEC2Client(key.Region)
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancel()
+
+	input := &ec2.DescribeSpotPriceHistoryInput{
+		InstanceTypes:    []ec2Types.InstanceType{ec2Types.InstanceType(key.InstanceType)},
+		AvailabilityZone: awsSDK.String(key.AvailabilityZone),
+		// Only retrieve Linux/UNIX (Amazon VPC) prices. The non-VPC
+		// "Linux/UNIX" variant was for EC2-Classic, which was fully retired in
+		// August 2023.
+		ProductDescriptions: []string{
+			"Linux/UNIX (Amazon VPC)",
+		},
+		// Only retrieve the latest price.
+		MaxResults: awsSDK.Int32(1),
+	}
+
+	resp, err := client.DescribeSpotPriceHistory(ctx, input)
+	if err != nil {
+		return nil, fmt.Errorf("describing spot price history for %s: %w", key, err)
+	}
+	if len(resp.SpotPriceHistory) == 0 {
+		return nil, fmt.Errorf("no spot price history found for %s", key)
+	}
+	spotPrice := resp.SpotPriceHistory[0]
+
+	if spotPrice.SpotPrice == nil || spotPrice.Timestamp == nil {
+		return nil, fmt.Errorf("missing required spot price history data for %s (SpotPrice=%v, Timestamp=%v)", key, spotPrice.SpotPrice, spotPrice.Timestamp)
+	}
+	price, err := strconv.ParseFloat(*spotPrice.SpotPrice, 64)
+	if err != nil {
+		return nil, fmt.Errorf("parsing spot price: %w", err)
+	}
+	return &SpotPriceHistoryEntry{
+		SpotPrice:   price,
+		Timestamp:   *spotPrice.Timestamp,
+		RetrievedAt: time.Now(),
+	}, nil
+}

+ 239 - 0
pkg/cloud/aws/spotpricehistory_test.go

@@ -0,0 +1,239 @@
+package aws
+
+import (
+	"errors"
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+)
+
+type mockSpotPriceHistoryFetcher struct {
+	fetchFunc func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error)
+}
+
+func (m *mockSpotPriceHistoryFetcher) FetchSpotPrice(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+	if m.fetchFunc != nil {
+		return m.fetchFunc(key)
+	}
+	return &SpotPriceHistoryEntry{
+		SpotPrice:   0.05,
+		Timestamp:   time.Now(),
+		RetrievedAt: time.Now(),
+	}, nil
+}
+
+func TestSpotPriceHistoryCache_GetSpotPrice_CacheHit(t *testing.T) {
+	mockFetcher := &mockSpotPriceHistoryFetcher{}
+	cache := NewSpotPriceHistoryCache(mockFetcher)
+
+	region := "us-west-2"
+	instanceType := "m5.large"
+	availabilityZone := "us-west-2a"
+
+	key := SpotPriceHistoryKey{
+		Region:           region,
+		InstanceType:     instanceType,
+		AvailabilityZone: availabilityZone,
+	}
+
+	cachedEntry := &SpotPriceHistoryEntry{
+		SpotPrice:   0.08,
+		Timestamp:   time.Now(),
+		RetrievedAt: time.Now(),
+	}
+	cache.cache[key] = cachedEntry
+
+	entry, err := cache.GetSpotPrice(region, instanceType, availabilityZone)
+	if err != nil {
+		t.Errorf("Expected no error, got %v", err)
+	}
+	if entry.SpotPrice != 0.08 {
+		t.Errorf("Expected spot price 0.08, got %f", entry.SpotPrice)
+	}
+}
+
+func TestSpotPriceHistoryCache_GetSpotPrice_CacheMiss(t *testing.T) {
+	fetchCalled := false
+	mockFetcher := &mockSpotPriceHistoryFetcher{
+		fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+			fetchCalled = true
+			return &SpotPriceHistoryEntry{
+				SpotPrice:   0.12,
+				Timestamp:   time.Now(),
+				RetrievedAt: time.Now(),
+			}, nil
+		},
+	}
+	cache := NewSpotPriceHistoryCache(mockFetcher)
+
+	entry, err := cache.GetSpotPrice("us-west-2", "m5.large", "us-west-2a")
+	if err != nil {
+		t.Errorf("Expected no error, got %v", err)
+	}
+	if !fetchCalled {
+		t.Error("Expected fetcher to be called for cache miss")
+	}
+	if entry.SpotPrice != 0.12 {
+		t.Errorf("Expected spot price 0.12, got %f", entry.SpotPrice)
+	}
+}
+
+func TestSpotPriceHistoryCache_GetSpotPrice_ConcurrentSameKey(t *testing.T) {
+	var fetchCount atomic.Int32
+	mockFetcher := &mockSpotPriceHistoryFetcher{
+		fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+			fetchCount.Add(1)
+			// Simulate slow API call to increase chance of concurrent access
+			time.Sleep(50 * time.Millisecond)
+			return &SpotPriceHistoryEntry{
+				SpotPrice:   0.07,
+				Timestamp:   time.Now(),
+				RetrievedAt: time.Now(),
+			}, nil
+		},
+	}
+	cache := NewSpotPriceHistoryCache(mockFetcher)
+
+	const goroutines = 10
+	var wg sync.WaitGroup
+	wg.Add(goroutines)
+	for i := 0; i < goroutines; i++ {
+		go func() {
+			defer wg.Done()
+			entry, err := cache.GetSpotPrice("us-west-2", "m5.large", "us-west-2a")
+			if err != nil {
+				t.Errorf("Expected no error, got %v", err)
+			}
+			if entry.SpotPrice != 0.07 {
+				t.Errorf("Expected spot price 0.07, got %f", entry.SpotPrice)
+			}
+		}()
+	}
+	wg.Wait()
+
+	if count := fetchCount.Load(); count != 1 {
+		t.Errorf("Expected exactly 1 fetch call, got %d", count)
+	}
+}
+
+func TestSpotPriceHistoryCache_GetSpotPrice_StaleEntry(t *testing.T) {
+	fetchCalled := false
+	mockFetcher := &mockSpotPriceHistoryFetcher{
+		fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+			fetchCalled = true
+			return &SpotPriceHistoryEntry{
+				SpotPrice:   0.15,
+				Timestamp:   time.Now(),
+				RetrievedAt: time.Now(),
+			}, nil
+		},
+	}
+	cache := NewSpotPriceHistoryCache(mockFetcher)
+
+	key := SpotPriceHistoryKey{
+		Region:           "us-west-2",
+		InstanceType:     "m5.large",
+		AvailabilityZone: "us-west-2a",
+	}
+
+	staleEntry := &SpotPriceHistoryEntry{
+		SpotPrice:   0.08,
+		Timestamp:   time.Now(),
+		RetrievedAt: time.Now().Add(-2 * time.Hour),
+	}
+	cache.cache[key] = staleEntry
+
+	entry, err := cache.GetSpotPrice("us-west-2", "m5.large", "us-west-2a")
+	if err != nil {
+		t.Errorf("Expected no error, got %v", err)
+	}
+	if !fetchCalled {
+		t.Error("Expected fetcher to be called for stale entry")
+	}
+	if entry.SpotPrice != 0.15 {
+		t.Errorf("Expected refreshed spot price 0.15, got %f", entry.SpotPrice)
+	}
+}
+
+func TestSpotPriceHistoryCache_GetSpotPrice_FetchError(t *testing.T) {
+	expectedError := errors.New("fetch failed")
+	mockFetcher := &mockSpotPriceHistoryFetcher{
+		fetchFunc: func(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
+			return nil, expectedError
+		},
+	}
+	cache := NewSpotPriceHistoryCache(mockFetcher)
+
+	_, err := cache.GetSpotPrice("us-west-2", "m5.large", "us-west-2a")
+	if err == nil {
+		t.Error("Expected error from failed fetch")
+	}
+	if !errors.Is(err, expectedError) {
+		t.Errorf("Expected error %v, got %v", expectedError, err)
+	}
+
+	key := SpotPriceHistoryKey{
+		Region:           "us-west-2",
+		InstanceType:     "m5.large",
+		AvailabilityZone: "us-west-2a",
+	}
+	cachedEntry := cache.cache[key]
+	if !errors.Is(cachedEntry.Error, expectedError) {
+		t.Errorf("Expected cached entry error %v, got %v", expectedError, cachedEntry.Error)
+	}
+}
+
+func TestSpotPriceHistoryEntry_shouldRefresh(t *testing.T) {
+	now := time.Now()
+
+	tests := []struct {
+		name        string
+		retrievedAt time.Time
+		expected    bool
+	}{
+		{
+			name:        "fresh entry",
+			retrievedAt: now,
+			expected:    false,
+		},
+		{
+			name:        "stale entry",
+			retrievedAt: now.Add(-2 * time.Hour),
+			expected:    true,
+		},
+		{
+			name:        "borderline entry",
+			retrievedAt: now.Add(-SpotPriceHistoryCacheAge + 1*time.Minute),
+			expected:    false,
+		},
+		{
+			name:        "expired entry",
+			retrievedAt: now.Add(-SpotPriceHistoryCacheAge - 1*time.Minute),
+			expected:    true,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			entry := SpotPriceHistoryEntry{
+				RetrievedAt: tt.retrievedAt,
+			}
+			if got := entry.shouldRefresh(); got != tt.expected {
+				t.Errorf("shouldRefresh() = %v, want %v", got, tt.expected)
+			}
+		})
+	}
+}
+
+func TestSpotPriceHistoryKey_String(t *testing.T) {
+	key := SpotPriceHistoryKey{
+		Region:           "us-west-2",
+		InstanceType:     "m5.large",
+		AvailabilityZone: "us-west-2a",
+	}
+	expected := "us-west-2/m5.large/us-west-2a"
+	if got := key.String(); got != expected {
+		t.Errorf("String() = %v, want %v", got, expected)
+	}
+}