module.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. package public
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/model/shared"
  9. "github.com/opencost/opencost/core/pkg/pricing"
  10. "github.com/opencost/opencost/core/pkg/reader"
  11. "github.com/opencost/opencost/core/pkg/unit"
  12. )
  13. // PricingModule must satisfy the pricing.PricingModule interface
  14. var _ pricing.PricingModule = (*PricingModule)(nil)
  15. type PricingModuleConfig struct {
  16. Provider shared.Provider
  17. Currency unit.Currency
  18. RefreshInterval time.Duration
  19. }
  20. type PricingModule struct {
  21. config PricingModuleConfig
  22. Providers *ProviderPricing `json:"provider" yaml:"provider"`
  23. pricingSet *pricing.PricingSet
  24. mu sync.RWMutex
  25. stopCh chan struct{}
  26. doneCh chan struct{}
  27. }
  28. func NewPricingModule(config PricingModuleConfig) (*PricingModule, error) {
  29. pm := &PricingModule{
  30. config: config,
  31. Providers: &ProviderPricing{},
  32. stopCh: make(chan struct{}),
  33. doneCh: make(chan struct{}),
  34. }
  35. ctx := context.Background()
  36. // Generate pricing data directly from the provider API
  37. pricingSet, err := GeneratePricingForProvider(config.Provider, config.Currency)
  38. if err != nil {
  39. return nil, fmt.Errorf("failed to generate pricing: %w", err)
  40. }
  41. // Store the pricing set for reader access
  42. pm.pricingSet = pricingSet
  43. err = pm.indexPricingSet(ctx, pricingSet)
  44. if err != nil {
  45. return nil, fmt.Errorf("failed to index pricing: %w", err)
  46. }
  47. // Start background refresh if configured
  48. if config.RefreshInterval > 0 {
  49. go pm.backgroundRefresh()
  50. log.Infof("Started background pricing refresh with interval: %v", config.RefreshInterval)
  51. }
  52. return pm, nil
  53. }
  54. type ProviderPricing map[shared.Provider]*InstanceTypePricing
  55. type InstanceTypePricing map[string]*RegionPricing
  56. type RegionPricing map[string]*pricing.Prices
  57. func (pm *PricingModule) indexPricingSet(_ context.Context, pricingSet *pricing.PricingSet) error {
  58. providers := make(ProviderPricing)
  59. // Index nodes
  60. for _, node := range pricingSet.NodePricing {
  61. provider := node.Properties.Provider
  62. instanceType := node.Properties.InstanceType
  63. region := node.Properties.Region
  64. // Instance type map
  65. if providers[provider] == nil {
  66. instanceMap := make(InstanceTypePricing)
  67. providers[provider] = &instanceMap
  68. }
  69. // Region map
  70. if (*providers[provider])[instanceType] == nil {
  71. regionMap := make(RegionPricing)
  72. (*providers[provider])[instanceType] = &regionMap
  73. }
  74. (*(*providers[provider])[instanceType])[region] = &node.Prices
  75. }
  76. // Index volumes
  77. for _, volume := range pricingSet.PersistentVolumePricing {
  78. provider := volume.Properties.Provider
  79. volumeType := string(volume.Properties.VolumeType)
  80. region := volume.Properties.Region
  81. // Instance type map
  82. if providers[provider] == nil {
  83. instanceMap := make(InstanceTypePricing)
  84. providers[provider] = &instanceMap
  85. }
  86. // Region map
  87. if (*providers[provider])[volumeType] == nil {
  88. regionMap := make(RegionPricing)
  89. (*providers[provider])[volumeType] = &regionMap
  90. }
  91. (*(*providers[provider])[volumeType])[region] = &volume.Prices
  92. }
  93. pm.Providers = &providers
  94. log.Infof("Indexed %d node pricing records and %d volume pricing records for provider %s (%s)",
  95. len(pricingSet.NodePricing), len(pricingSet.PersistentVolumePricing), pm.config.Provider, pm.config.Currency)
  96. return nil
  97. }
  98. // GetNodePricing provides fast lookup for node pricing by provider, instance type, and region
  99. func (pm *PricingModule) GetNodePricing(ctx context.Context, props pricing.NodePricingProperties) (*pricing.NodePricing, error) {
  100. if err := ctx.Err(); err != nil {
  101. return nil, err
  102. }
  103. pm.mu.RLock()
  104. defer pm.mu.RUnlock()
  105. if pm.Providers == nil {
  106. return nil, fmt.Errorf("pricing not loaded")
  107. }
  108. provider := props.Provider
  109. instanceType := props.InstanceType
  110. region := props.Region
  111. providerPricing := (*pm.Providers)[provider]
  112. if providerPricing == nil {
  113. return nil, fmt.Errorf("provider %s not found", provider)
  114. }
  115. instancePricing := (*providerPricing)[instanceType]
  116. if instancePricing == nil {
  117. return nil, fmt.Errorf("instance type %s not found for provider %s", instanceType, provider)
  118. }
  119. regionPricing := (*instancePricing)[region]
  120. if regionPricing == nil {
  121. return nil, fmt.Errorf("region %s not found for instance type %s in provider %s", region, instanceType, provider)
  122. }
  123. // Reconstruct NodePricing from Prices
  124. return &pricing.NodePricing{
  125. Properties: pricing.NodePricingProperties{
  126. Provider: provider,
  127. InstanceType: instanceType,
  128. Region: region,
  129. },
  130. Prices: *regionPricing,
  131. }, nil
  132. }
  133. // GetPersistentVolumePricing provides fast lookup for volume pricing by provider, volume type, and region
  134. func (pm *PricingModule) GetPersistentVolumePricing(ctx context.Context, props pricing.PersistentVolumePricingProperties) (*pricing.PersistentVolumePricing, error) {
  135. if err := ctx.Err(); err != nil {
  136. return nil, err
  137. }
  138. pm.mu.RLock()
  139. defer pm.mu.RUnlock()
  140. if pm.Providers == nil {
  141. return nil, fmt.Errorf("pricing not loaded")
  142. }
  143. provider := props.Provider
  144. volumeType := string(props.VolumeType)
  145. region := props.Region
  146. providerPricing := (*pm.Providers)[provider]
  147. if providerPricing == nil {
  148. return nil, fmt.Errorf("provider %s not found", provider)
  149. }
  150. instancePricing := (*providerPricing)[volumeType]
  151. if instancePricing == nil {
  152. return nil, fmt.Errorf("volume type %s not found for provider %s", volumeType, provider)
  153. }
  154. regionPricing := (*instancePricing)[region]
  155. if regionPricing == nil {
  156. return nil, fmt.Errorf("region %s not found for volume type %s in provider %s", region, volumeType, provider)
  157. }
  158. // Reconstruct NodePricing from Prices
  159. return &pricing.PersistentVolumePricing{
  160. Properties: pricing.PersistentVolumePricingProperties{
  161. Provider: provider,
  162. VolumeType: pricing.VolumeType(volumeType),
  163. Region: region,
  164. },
  165. Prices: *regionPricing,
  166. }, nil
  167. }
  168. func (pm *PricingModule) NewNodePricingReader(ctx context.Context) (reader.Reader[*pricing.NodePricing], error) {
  169. pm.mu.RLock()
  170. defer pm.mu.RUnlock()
  171. return reader.NewSliceReader(pm.pricingSet.NodePricing), nil
  172. }
  173. func (pm *PricingModule) NewPersistentVolumePricingReader(ctx context.Context) (reader.Reader[*pricing.PersistentVolumePricing], error) {
  174. pm.mu.RLock()
  175. defer pm.mu.RUnlock()
  176. return reader.NewSliceReader(pm.pricingSet.PersistentVolumePricing), nil
  177. }
  178. // GetClusterPricing returns cluster pricing matching the given provider.
  179. func (pm *PricingModule) GetClusterPricing(ctx context.Context, props pricing.ClusterPricingProperties) (*pricing.ClusterPricing, error) {
  180. if err := ctx.Err(); err != nil {
  181. return nil, err
  182. }
  183. pm.mu.RLock()
  184. defer pm.mu.RUnlock()
  185. if pm.pricingSet == nil {
  186. return nil, fmt.Errorf("pricing not loaded")
  187. }
  188. for _, cp := range pm.pricingSet.ClusterPricing {
  189. if cp.Properties.Provider == props.Provider {
  190. return cp, nil
  191. }
  192. }
  193. return nil, fmt.Errorf("cluster pricing not found for provider=%s", props.Provider)
  194. }
  195. func (pm *PricingModule) NewClusterPricingReader(ctx context.Context) (reader.Reader[*pricing.ClusterPricing], error) {
  196. pm.mu.RLock()
  197. defer pm.mu.RUnlock()
  198. return reader.NewSliceReader(pm.pricingSet.ClusterPricing), nil
  199. }
  200. // GetNetworkPricing returns network pricing matching the given provider, traffic
  201. // direction, traffic type, and NAT gateway flag.
  202. func (pm *PricingModule) GetNetworkPricing(ctx context.Context, props pricing.NetworkPricingProperties) (*pricing.NetworkPricing, error) {
  203. if err := ctx.Err(); err != nil {
  204. return nil, err
  205. }
  206. pm.mu.RLock()
  207. defer pm.mu.RUnlock()
  208. if pm.pricingSet == nil {
  209. return nil, fmt.Errorf("pricing not loaded")
  210. }
  211. for _, np := range pm.pricingSet.NetworkPricing {
  212. if np.Properties.Provider == props.Provider &&
  213. np.Properties.TrafficDirection == props.TrafficDirection &&
  214. np.Properties.TrafficType == props.TrafficType &&
  215. np.Properties.IsNatGateway == props.IsNatGateway {
  216. return np, nil
  217. }
  218. }
  219. return nil, fmt.Errorf("network pricing not found for provider=%s, trafficDirection=%s, trafficType=%s, isNatGateway=%t",
  220. props.Provider, props.TrafficDirection, props.TrafficType, props.IsNatGateway)
  221. }
  222. func (pm *PricingModule) NewNetworkPricingReader(ctx context.Context) (reader.Reader[*pricing.NetworkPricing], error) {
  223. pm.mu.RLock()
  224. defer pm.mu.RUnlock()
  225. return reader.NewSliceReader(pm.pricingSet.NetworkPricing), nil
  226. }
  227. // GetServicePricing returns service pricing matching the given provider and region.
  228. func (pm *PricingModule) GetServicePricing(ctx context.Context, props pricing.ServicePricingProperties) (*pricing.ServicePricing, error) {
  229. if err := ctx.Err(); err != nil {
  230. return nil, err
  231. }
  232. pm.mu.RLock()
  233. defer pm.mu.RUnlock()
  234. if pm.pricingSet == nil {
  235. return nil, fmt.Errorf("pricing not loaded")
  236. }
  237. for _, sp := range pm.pricingSet.ServicePricing {
  238. if sp.Properties.Provider == props.Provider &&
  239. sp.Properties.Region == props.Region {
  240. return sp, nil
  241. }
  242. }
  243. return nil, fmt.Errorf("service pricing not found for provider=%s, region=%s", props.Provider, props.Region)
  244. }
  245. func (pm *PricingModule) NewServicePricingReader(ctx context.Context) (reader.Reader[*pricing.ServicePricing], error) {
  246. pm.mu.RLock()
  247. defer pm.mu.RUnlock()
  248. return reader.NewSliceReader(pm.pricingSet.ServicePricing), nil
  249. }
  250. // GetPricingSet returns the current in-memory pricing set
  251. func (pm *PricingModule) GetPricingSet(ctx context.Context) (*pricing.PricingSet, error) {
  252. if err := ctx.Err(); err != nil {
  253. return nil, err
  254. }
  255. pm.mu.RLock()
  256. defer pm.mu.RUnlock()
  257. return pm.pricingSet, nil
  258. }
  259. // TODO: Make this a const? This string is correct, but is also defined in KCM.
  260. func (pm *PricingModule) SourceKind() string {
  261. return "public"
  262. }
  263. // TODO: This seems like a reasonable choice for a source name... but let's think about it a bit more.
  264. func (pm *PricingModule) SourceName() string {
  265. return string(pm.config.Provider)
  266. }
  267. func (pm *PricingModule) Checksum(ctx context.Context) (string, error) {
  268. pm.mu.RLock()
  269. defer pm.mu.RUnlock()
  270. if pm.pricingSet == nil {
  271. return "", fmt.Errorf("pricing not loaded")
  272. }
  273. return pm.pricingSet.Checksum()
  274. }
  275. // ComparePricingSet compares the current in-memory pricing set with a new one
  276. // Returns true if they are identical, false if different
  277. func (pm *PricingModule) ComparePricingSet(newPricingSet *pricing.PricingSet) (bool, error) {
  278. pm.mu.RLock()
  279. defer pm.mu.RUnlock()
  280. sum, err := pm.pricingSet.Checksum()
  281. if err != nil {
  282. return false, fmt.Errorf("failed to checksum current pricing set: %w", err)
  283. }
  284. newSum, err := newPricingSet.Checksum()
  285. if err != nil {
  286. return false, fmt.Errorf("failed to serialize new pricing set: %w", err)
  287. }
  288. return sum == newSum, nil
  289. }
  290. // UpdatePricingSet replaces the current pricing set with a new one and re-indexes it
  291. func (pm *PricingModule) UpdatePricingSet(ctx context.Context, newPricingSet *pricing.PricingSet) error {
  292. if newPricingSet == nil {
  293. return fmt.Errorf("new pricing set is nil")
  294. }
  295. pm.mu.Lock()
  296. defer pm.mu.Unlock()
  297. // Store the new pricing set
  298. pm.pricingSet = newPricingSet
  299. // Re-index the pricing data
  300. err := pm.indexPricingSet(ctx, newPricingSet)
  301. if err != nil {
  302. return fmt.Errorf("failed to index new pricing set: %w", err)
  303. }
  304. log.Infof("Updated pricing set: %d node pricing records and %d volume pricing records",
  305. len(newPricingSet.NodePricing), len(newPricingSet.PersistentVolumePricing))
  306. return nil
  307. }
  308. // backgroundRefresh periodically fetches new pricing data and updates the module
  309. func (pm *PricingModule) backgroundRefresh() {
  310. defer close(pm.doneCh)
  311. ticker := time.NewTicker(pm.config.RefreshInterval)
  312. defer ticker.Stop()
  313. for {
  314. select {
  315. case <-ticker.C:
  316. log.Infof("Starting scheduled pricing refresh for %s (%s)", pm.config.Provider, pm.config.Currency)
  317. // Fetch new pricing data
  318. newPricingSet, err := GeneratePricingForProvider(pm.config.Provider, pm.config.Currency)
  319. if err != nil {
  320. log.Errorf("Failed to refresh pricing data: %v", err)
  321. continue
  322. }
  323. // Compare with existing data
  324. isIdentical, err := pm.ComparePricingSet(newPricingSet)
  325. if err != nil {
  326. log.Errorf("Failed to compare pricing data: %v", err)
  327. continue
  328. }
  329. if isIdentical {
  330. log.Infof("Pricing data unchanged, skipping update")
  331. continue
  332. }
  333. // Update with new data
  334. ctx := context.Background()
  335. if err := pm.UpdatePricingSet(ctx, newPricingSet); err != nil {
  336. log.Errorf("Failed to update pricing data: %v", err)
  337. continue
  338. }
  339. log.Infof("Successfully refreshed pricing data")
  340. case <-pm.stopCh:
  341. log.Infof("Stopping background pricing refresh")
  342. return
  343. }
  344. }
  345. }
  346. // Stop gracefully stops the background refresh goroutine
  347. func (pm *PricingModule) Stop() {
  348. if pm.config.RefreshInterval > 0 {
  349. close(pm.stopCh)
  350. <-pm.doneCh
  351. log.Infof("Background pricing refresh stopped")
  352. }
  353. }