|
|
@@ -53,6 +53,7 @@ const (
|
|
|
|
|
|
APIPricingSource = "Public API"
|
|
|
SpotPricingSource = "Spot Data Feed"
|
|
|
+ SpotPriceHistorySource = "Spot Price History"
|
|
|
ReservedInstancePricingSource = "Savings Plan, Reserved Instance, and Out-Of-Cluster"
|
|
|
FargatePricingSource = "Fargate"
|
|
|
|
|
|
@@ -96,11 +97,7 @@ func (aws *AWS) PricingSourceStatus() map[string]*models.PricingSource {
|
|
|
Enabled: true,
|
|
|
}
|
|
|
|
|
|
- if !aws.SpotRefreshEnabled() {
|
|
|
- sps.Available = false
|
|
|
- sps.Error = "Spot instances not set up"
|
|
|
- sps.Enabled = false
|
|
|
- } else {
|
|
|
+ if aws.SpotFeedRefreshEnabled() {
|
|
|
sps.Error = ""
|
|
|
if aws.SpotPricingError != nil {
|
|
|
sps.Error = aws.SpotPricingError.Error()
|
|
|
@@ -112,9 +109,28 @@ func (aws *AWS) PricingSourceStatus() map[string]*models.PricingSource {
|
|
|
} else {
|
|
|
sps.Error = "No spot instances detected"
|
|
|
}
|
|
|
+ } else {
|
|
|
+ sps.Available = false
|
|
|
+ sps.Error = "Spot instances not set up"
|
|
|
+ sps.Enabled = false
|
|
|
}
|
|
|
sources[SpotPricingSource] = sps
|
|
|
|
|
|
+ sphs := &models.PricingSource{
|
|
|
+ Name: SpotPriceHistorySource,
|
|
|
+ Enabled: true,
|
|
|
+ }
|
|
|
+ if aws.SpotPriceHistoryError != nil {
|
|
|
+ sphs.Error = aws.SpotPriceHistoryError.Error()
|
|
|
+ sphs.Available = false
|
|
|
+ } else if aws.SpotPriceHistoryCache == nil {
|
|
|
+ sphs.Error = "Not yet initialized"
|
|
|
+ sphs.Available = false
|
|
|
+ } else {
|
|
|
+ sphs.Available = true
|
|
|
+ }
|
|
|
+ sources[SpotPriceHistorySource] = sphs
|
|
|
+
|
|
|
rps := &models.PricingSource{
|
|
|
Name: ReservedInstancePricingSource,
|
|
|
Enabled: true,
|
|
|
@@ -185,6 +201,8 @@ type AWS struct {
|
|
|
SpotRefreshRunning bool
|
|
|
SpotPricingLock sync.RWMutex
|
|
|
SpotPricingError error
|
|
|
+ SpotPriceHistoryCache *SpotPriceHistoryCache
|
|
|
+ SpotPriceHistoryError error
|
|
|
RIPricingByInstanceID map[string]*RIData
|
|
|
RIPricingError error
|
|
|
RIDataRunning bool
|
|
|
@@ -848,8 +866,8 @@ func (aws *AWS) getRegionPricing(nodeList []*clustercache.Node) (*http.Response,
|
|
|
return resp, pricingURL, err
|
|
|
}
|
|
|
|
|
|
-// SpotRefreshEnabled determines whether the required configs to run the spot feed query have been set up
|
|
|
-func (aws *AWS) SpotRefreshEnabled() bool {
|
|
|
+// SpotFeedRefreshEnabled determines whether the required configs to run the spot feed query have been set up
|
|
|
+func (aws *AWS) SpotFeedRefreshEnabled() bool {
|
|
|
// Guard against nil receiver
|
|
|
if aws == nil {
|
|
|
return false
|
|
|
@@ -1019,28 +1037,36 @@ func (aws *AWS) DownloadPricingData() error {
|
|
|
}
|
|
|
log.Infof("Finished downloading \"%s\"", pricingURL)
|
|
|
|
|
|
- if !aws.SpotRefreshEnabled() {
|
|
|
- return nil
|
|
|
+ // Initialize a spot price history cache if not already initialized.
|
|
|
+ // Reset error to allow retrying on subsequent DownloadPricingData calls.
|
|
|
+ if aws.SpotPriceHistoryCache == nil {
|
|
|
+ aws.SpotPriceHistoryError = nil
|
|
|
+ aws.SpotPriceHistoryCache, aws.SpotPriceHistoryError = aws.initializeSpotPriceHistoryCache()
|
|
|
+ if aws.SpotPriceHistoryError != nil {
|
|
|
+ log.Errorf("Failed to initialize spot price history manager: %s", aws.SpotPriceHistoryError)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- // Always run spot pricing refresh when performing download
|
|
|
- aws.refreshSpotPricing(true)
|
|
|
+ if aws.SpotFeedRefreshEnabled() {
|
|
|
+ // Always run spot pricing refresh when performing download
|
|
|
+ aws.refreshSpotPricing(true)
|
|
|
|
|
|
- // Only start a single refresh goroutine
|
|
|
- if !aws.SpotRefreshRunning {
|
|
|
- aws.SpotRefreshRunning = true
|
|
|
+ // Only start a single refresh goroutine
|
|
|
+ if !aws.SpotRefreshRunning {
|
|
|
+ aws.SpotRefreshRunning = true
|
|
|
|
|
|
- go func() {
|
|
|
- defer errs.HandlePanic()
|
|
|
+ go func() {
|
|
|
+ defer errs.HandlePanic()
|
|
|
|
|
|
- for {
|
|
|
- log.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
|
|
|
- time.Sleep(SpotRefreshDuration)
|
|
|
+ for {
|
|
|
+ log.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
|
|
|
+ time.Sleep(SpotRefreshDuration)
|
|
|
|
|
|
- // Reoccurring refresh checks update times
|
|
|
- aws.refreshSpotPricing(false)
|
|
|
- }
|
|
|
- }()
|
|
|
+ // Reoccurring refresh checks update times
|
|
|
+ aws.refreshSpotPricing(false)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
@@ -1278,6 +1304,60 @@ func (aws *AWS) refreshSpotPricing(force bool) {
|
|
|
aws.SpotPricingByInstanceID = sp
|
|
|
}
|
|
|
|
|
|
+func (aws *AWS) initializeSpotPriceHistoryCache() (*SpotPriceHistoryCache, error) {
|
|
|
+ log.Info("Initializing AWS Spot Price History Manager")
|
|
|
+
|
|
|
+ // Get AWS access key for creating config
|
|
|
+ accessKey, err := aws.GetAWSAccessKey()
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("getting AWS access key for spot price history: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Use the cluster region to create the initial AWS config and credentials.
|
|
|
+ // The SpotPriceHistoryFetcher itself can query multiple regions by creating
|
|
|
+ // region-specific EC2 clients as needed.
|
|
|
+ if aws.ClusterRegion == "" {
|
|
|
+ return nil, fmt.Errorf("no cluster region configured")
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create config for the cluster region
|
|
|
+ awsConfig, err := accessKey.CreateConfig(aws.ClusterRegion)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("creating AWS config for spot price history: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ return NewSpotPriceHistoryCache(NewAWSSpotPriceHistoryFetcher(awsConfig)), nil
|
|
|
+}
|
|
|
+
|
|
|
+func (aws *AWS) spotPricingFromHistory(k models.Key) (*SpotPriceHistoryEntry, bool) {
|
|
|
+ if aws.SpotPriceHistoryCache == nil {
|
|
|
+ return nil, false
|
|
|
+ }
|
|
|
+
|
|
|
+ // Extract region, instance type, and availability zone from the key
|
|
|
+ awsKey, ok := k.(*awsKey)
|
|
|
+ if !ok {
|
|
|
+ log.DedupedWarningf(10, "Failed to cast key to awsKey for spot price history lookup: %s", k.ID())
|
|
|
+ return nil, false
|
|
|
+ }
|
|
|
+
|
|
|
+ region, regionOk := util.GetRegion(awsKey.Labels)
|
|
|
+ instanceType, instanceTypeOk := util.GetInstanceType(awsKey.Labels)
|
|
|
+ availabilityZone, availabilityZoneOk := util.GetZone(awsKey.Labels)
|
|
|
+ // Skip lookup if any required information is missing
|
|
|
+ if !regionOk || !instanceTypeOk || !availabilityZoneOk {
|
|
|
+ log.DedupedWarningf(10, "Missing required info for spot price history lookup (region: %s, instanceType: %s, zone: %s): %s", region, instanceType, availabilityZone, k.ID())
|
|
|
+ return nil, false
|
|
|
+ }
|
|
|
+
|
|
|
+ price, err := aws.SpotPriceHistoryCache.GetSpotPrice(region, instanceType, availabilityZone)
|
|
|
+ if err != nil {
|
|
|
+ log.DedupedWarningf(10, "Failed to get spot price history for instance %s: %s", k.ID(), err.Error())
|
|
|
+ return nil, false
|
|
|
+ }
|
|
|
+ return price, true
|
|
|
+}
|
|
|
+
|
|
|
// Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
|
|
|
func (aws *AWS) NetworkPricing() (*models.Network, error) {
|
|
|
cpricing, err := aws.Config.GetCustomPricingData()
|
|
|
@@ -1403,11 +1483,29 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k models.Ke
|
|
|
UsageType: PreemptibleType,
|
|
|
}, meta, nil
|
|
|
} else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
|
|
|
- if aws.SpotRefreshEnabled() {
|
|
|
- log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
|
|
|
+ log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
|
|
|
+
|
|
|
+ // Try to get spot pricing from DescribeSpotPriceHistory API
|
|
|
+ if historyEntry, ok := aws.spotPricingFromHistory(k); ok {
|
|
|
+ log.DedupedInfof(5, "Using spot price history data for node %s: $%f", k.ID(), historyEntry.SpotPrice)
|
|
|
+ spotHistoryCost := fmt.Sprintf("%f", historyEntry.SpotPrice)
|
|
|
+ meta.Source = SpotPriceHistorySource
|
|
|
+ return &models.Node{
|
|
|
+ Cost: spotHistoryCost,
|
|
|
+ VCPU: terms.VCpu,
|
|
|
+ RAM: terms.Memory,
|
|
|
+ GPU: terms.GPU,
|
|
|
+ Storage: terms.Storage,
|
|
|
+ BaseCPUPrice: aws.BaseCPUPrice,
|
|
|
+ BaseRAMPrice: aws.BaseRAMPrice,
|
|
|
+ BaseGPUPrice: aws.BaseGPUPrice,
|
|
|
+ UsageType: PreemptibleType,
|
|
|
+ }, meta, nil
|
|
|
}
|
|
|
+
|
|
|
if publicPricingFound {
|
|
|
// return public price if found
|
|
|
+ log.DedupedWarningf(5, "No spot price history available for %s, falling back to on-demand pricing", k.ID())
|
|
|
return &models.Node{
|
|
|
Cost: cost,
|
|
|
VCPU: terms.VCpu,
|
|
|
@@ -1421,9 +1519,7 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k models.Ke
|
|
|
}, meta, nil
|
|
|
} else {
|
|
|
// return defaults if public pricing not found
|
|
|
- if aws.SpotRefreshEnabled() {
|
|
|
- log.DedupedWarningf(5, "Could not find Node %s's public pricing info, using default configured spot prices instead", k.ID())
|
|
|
- }
|
|
|
+ log.DedupedWarningf(5, "Could not find Node %s's public pricing info, using default configured spot prices instead", k.ID())
|
|
|
return &models.Node{
|
|
|
VCPU: terms.VCpu,
|
|
|
VCPUCost: aws.BaseSpotCPUPrice,
|