spotpricehistory.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package aws
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "sync"
  7. "time"
  8. awsSDK "github.com/aws/aws-sdk-go-v2/aws"
  9. "github.com/aws/aws-sdk-go-v2/service/ec2"
  10. ec2Types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
  11. "github.com/opencost/opencost/core/pkg/log"
  12. )
  13. // SpotPriceHistoryKey uniquely identifies a spot price lookup by region,
  14. // instance type, and availability zone.
  15. type SpotPriceHistoryKey struct {
  16. Region string
  17. InstanceType string
  18. AvailabilityZone string
  19. }
  20. func (key SpotPriceHistoryKey) String() string {
  21. return fmt.Sprintf("%s/%s/%s", key.Region, key.InstanceType, key.AvailabilityZone)
  22. }
  23. const (
  24. SpotPriceHistoryCacheAge = 1 * time.Hour
  25. )
  26. // SpotPriceHistoryEntry holds a cached spot price from the DescribeSpotPriceHistory API.
  27. type SpotPriceHistoryEntry struct {
  28. SpotPrice float64
  29. Timestamp time.Time
  30. RetrievedAt time.Time
  31. Error error // Negative cache
  32. }
  33. func (spe SpotPriceHistoryEntry) shouldRefresh() bool {
  34. return time.Since(spe.RetrievedAt) > SpotPriceHistoryCacheAge
  35. }
  36. // SpotPriceHistoryCache provides a thread-safe, on-demand cache for spot prices
  37. // retrieved via the DescribeSpotPriceHistory API. Entries are cached for
  38. // SpotPriceHistoryCacheAge and include negative caching for errors.
  39. type SpotPriceHistoryCache struct {
  40. cache map[SpotPriceHistoryKey]*SpotPriceHistoryEntry
  41. mutex sync.Mutex
  42. refreshRunning map[SpotPriceHistoryKey]bool
  43. refreshCond *sync.Cond
  44. fetcher SpotPriceHistoryFetcher
  45. }
  46. func NewSpotPriceHistoryCache(fetcher SpotPriceHistoryFetcher) *SpotPriceHistoryCache {
  47. cache := &SpotPriceHistoryCache{
  48. cache: make(map[SpotPriceHistoryKey]*SpotPriceHistoryEntry),
  49. refreshRunning: make(map[SpotPriceHistoryKey]bool),
  50. fetcher: fetcher,
  51. }
  52. cache.refreshCond = sync.NewCond(&cache.mutex)
  53. return cache
  54. }
  55. // GetSpotPrice returns the cached spot price for the given region, instance type,
  56. // and availability zone. If the cache entry is missing or stale, it fetches a
  57. // fresh value from the underlying SpotPriceHistoryFetcher.
  58. func (sph *SpotPriceHistoryCache) GetSpotPrice(region, instanceType, availabilityZone string) (*SpotPriceHistoryEntry, error) {
  59. key := SpotPriceHistoryKey{
  60. Region: region,
  61. InstanceType: instanceType,
  62. AvailabilityZone: availabilityZone,
  63. }
  64. sph.mutex.Lock()
  65. for sph.refreshRunning[key] {
  66. sph.refreshCond.Wait()
  67. }
  68. // Check if we have cached price. If so, return it.
  69. entry, exists := sph.cache[key]
  70. if exists && !entry.shouldRefresh() {
  71. sph.mutex.Unlock()
  72. return entry, entry.Error
  73. }
  74. // Either a cache entry does not exist or it is stale. Refresh it.
  75. sph.refreshRunning[key] = true
  76. sph.mutex.Unlock()
  77. // Ensure refreshRunning is always cleared, even if the fetcher panics.
  78. defer func() {
  79. sph.mutex.Lock()
  80. delete(sph.refreshRunning, key)
  81. sph.refreshCond.Broadcast()
  82. sph.mutex.Unlock()
  83. }()
  84. // Fetch the entry
  85. entry, err := sph.fetcher.FetchSpotPrice(key)
  86. if err != nil || entry == nil {
  87. // If we fail to fetch or get a nil entry, create a negative cache entry.
  88. if err == nil {
  89. err = fmt.Errorf("fetcher returned nil entry for %s", key)
  90. }
  91. entry = &SpotPriceHistoryEntry{
  92. RetrievedAt: time.Now(),
  93. Error: err,
  94. }
  95. } else {
  96. // Normalize cache metadata so cache freshness does not depend on
  97. // the fetcher setting these fields correctly.
  98. entry.RetrievedAt = time.Now()
  99. entry.Error = nil
  100. }
  101. // Store it into the cache
  102. sph.mutex.Lock()
  103. sph.cache[key] = entry
  104. sph.mutex.Unlock()
  105. return entry, entry.Error
  106. }
  107. // SpotPriceHistoryFetcher is the interface for fetching spot prices from the
  108. // DescribeSpotPriceHistory API (or a mock for testing).
  109. type SpotPriceHistoryFetcher interface {
  110. FetchSpotPrice(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error)
  111. }
  112. // AWSSpotPriceHistoryFetcher implements SpotPriceHistoryFetcher using the real
  113. // AWS EC2 DescribeSpotPriceHistory API. It maintains a pool of per-region
  114. // EC2 clients.
  115. type AWSSpotPriceHistoryFetcher struct {
  116. awsConfig awsSDK.Config
  117. ec2ClientsMutex sync.Mutex
  118. ec2Clients map[string]*ec2.Client
  119. }
  120. func NewAWSSpotPriceHistoryFetcher(awsConfig awsSDK.Config) *AWSSpotPriceHistoryFetcher {
  121. return &AWSSpotPriceHistoryFetcher{
  122. awsConfig: awsConfig,
  123. ec2Clients: make(map[string]*ec2.Client),
  124. }
  125. }
  126. func (a *AWSSpotPriceHistoryFetcher) getEC2Client(region string) *ec2.Client {
  127. a.ec2ClientsMutex.Lock()
  128. defer a.ec2ClientsMutex.Unlock()
  129. if client, ok := a.ec2Clients[region]; ok {
  130. return client
  131. }
  132. config := a.awsConfig
  133. config.Region = region
  134. client := ec2.NewFromConfig(config)
  135. a.ec2Clients[region] = client
  136. return client
  137. }
  138. func (a *AWSSpotPriceHistoryFetcher) FetchSpotPrice(key SpotPriceHistoryKey) (*SpotPriceHistoryEntry, error) {
  139. log.Debugf("Retrieving spot price history for %s", key)
  140. client := a.getEC2Client(key.Region)
  141. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  142. defer cancel()
  143. input := &ec2.DescribeSpotPriceHistoryInput{
  144. InstanceTypes: []ec2Types.InstanceType{ec2Types.InstanceType(key.InstanceType)},
  145. AvailabilityZone: awsSDK.String(key.AvailabilityZone),
  146. // Only retrieve Linux/UNIX (Amazon VPC) prices. The non-VPC
  147. // "Linux/UNIX" variant was for EC2-Classic, which was fully retired in
  148. // August 2023.
  149. ProductDescriptions: []string{
  150. "Linux/UNIX (Amazon VPC)",
  151. },
  152. // Only retrieve the latest price.
  153. MaxResults: awsSDK.Int32(1),
  154. }
  155. resp, err := client.DescribeSpotPriceHistory(ctx, input)
  156. if err != nil {
  157. return nil, fmt.Errorf("describing spot price history for %s: %w", key, err)
  158. }
  159. if len(resp.SpotPriceHistory) == 0 {
  160. return nil, fmt.Errorf("no spot price history found for %s", key)
  161. }
  162. spotPrice := resp.SpotPriceHistory[0]
  163. if spotPrice.SpotPrice == nil || spotPrice.Timestamp == nil {
  164. return nil, fmt.Errorf("missing required spot price history data for %s (SpotPrice=%v, Timestamp=%v)", key, spotPrice.SpotPrice, spotPrice.Timestamp)
  165. }
  166. price, err := strconv.ParseFloat(*spotPrice.SpotPrice, 64)
  167. if err != nil {
  168. return nil, fmt.Errorf("parsing spot price: %w", err)
  169. }
  170. return &SpotPriceHistoryEntry{
  171. SpotPrice: price,
  172. Timestamp: *spotPrice.Timestamp,
  173. RetrievedAt: time.Now(),
  174. }, nil
  175. }