settings.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package costmodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/pkg/cloud/models"
  7. "github.com/patrickmn/go-cache"
  8. )
  9. // InitializeSettingsPubSub sets up the pub/sub mechanisms and kicks of
  10. // routines to detect and publish changes, as well as some routines that
  11. // subscribe and take actions.
  12. func (a *Accesses) InitializeSettingsPubSub() {
  13. a.settingsSubscribers = map[string][]chan string{}
  14. // Publish settings changes
  15. go func(a *Accesses) {
  16. for {
  17. // Publish changes to custom pricing
  18. if a.customPricingHasChanged() {
  19. for _, ch := range a.settingsSubscribers[CustomPricingSetting] {
  20. if data, ok := a.SettingsCache.Get(CustomPricingSetting); ok {
  21. if cpStr, ok := data.(string); ok {
  22. ch <- cpStr
  23. }
  24. }
  25. }
  26. }
  27. // Publish changes to discount
  28. if a.discountHasChanged() {
  29. for _, ch := range a.settingsSubscribers[DiscountSetting] {
  30. if data, ok := a.SettingsCache.Get(DiscountSetting); ok {
  31. if discStr, ok := data.(string); ok {
  32. ch <- discStr
  33. }
  34. }
  35. }
  36. }
  37. time.Sleep(500 * time.Millisecond)
  38. }
  39. }(a)
  40. // Clear caches when custom pricing or discount changes
  41. go func(a *Accesses) {
  42. costDataCacheCh := make(chan string)
  43. a.SubscribeToCustomPricingChanges(costDataCacheCh)
  44. a.SubscribeToDiscountChanges(costDataCacheCh)
  45. for {
  46. msg := <-costDataCacheCh
  47. log.Infof("Flushing cost data caches: %s", msg)
  48. a.CostDataCache.Flush()
  49. }
  50. }(a)
  51. }
  52. // SubscribeToCustomPricingChanges subscribes the given channel to receive
  53. // custom pricing changes.
  54. func (a *Accesses) SubscribeToCustomPricingChanges(ch chan string) {
  55. a.settingsMutex.Lock()
  56. defer a.settingsMutex.Unlock()
  57. a.settingsSubscribers[CustomPricingSetting] = append(a.settingsSubscribers[CustomPricingSetting], ch)
  58. }
  59. // SubscribeToDiscountChanges subscribes the given channel to receive discount
  60. // changes.
  61. func (a *Accesses) SubscribeToDiscountChanges(ch chan string) {
  62. a.settingsMutex.Lock()
  63. defer a.settingsMutex.Unlock()
  64. a.settingsSubscribers[DiscountSetting] = append(a.settingsSubscribers[DiscountSetting], ch)
  65. }
  66. // customPricingHasChanged returns true if custom pricing settings have changed
  67. // since the last time this function was called.
  68. func (a *Accesses) customPricingHasChanged() bool {
  69. customPricing, err := a.CloudProvider.GetConfig()
  70. if err != nil || customPricing == nil {
  71. log.Errorf("error accessing cloud provider configuration: %s", err)
  72. return false
  73. }
  74. // describe parameters by which we determine whether or not custom
  75. // pricing settings have changed
  76. encodeCustomPricing := func(cp *models.CustomPricing) string {
  77. return fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%s:%s", cp.CustomPricesEnabled, cp.CPU, cp.SpotCPU,
  78. cp.RAM, cp.SpotRAM, cp.GPU, cp.Storage, cp.CurrencyCode, cp.SharedOverhead)
  79. }
  80. // compare cached custom pricing parameters with current values
  81. cpStr := encodeCustomPricing(customPricing)
  82. cpStrCached := ""
  83. val, found := a.SettingsCache.Get(CustomPricingSetting)
  84. if !found {
  85. // if no settings are found (e.g. upon first call) cache custom pricing settings but
  86. // return false, as nothing has "changed" per se
  87. a.SettingsCache.Set(CustomPricingSetting, cpStr, cache.NoExpiration)
  88. return false
  89. }
  90. cpStrCached, ok := val.(string)
  91. if !ok {
  92. log.Errorf("caching error: failed to cast custom pricing to string")
  93. }
  94. if cpStr == cpStrCached {
  95. return false
  96. }
  97. // cache new custom pricing settings
  98. a.SettingsCache.Set(CustomPricingSetting, cpStr, cache.DefaultExpiration)
  99. return true
  100. }
  101. // discountHasChanged returns true if discount settings have changed
  102. // since the last time this function was called.
  103. func (a *Accesses) discountHasChanged() bool {
  104. customPricing, err := a.CloudProvider.GetConfig()
  105. if err != nil || customPricing == nil {
  106. log.Errorf("error accessing cloud provider configuration: %s", err)
  107. return false
  108. }
  109. // describe parameters by which we determine whether or not custom
  110. // pricing settings have changed
  111. encodeDiscount := func(cp *models.CustomPricing) string {
  112. return fmt.Sprintf("%s:%s", cp.Discount, cp.NegotiatedDiscount)
  113. }
  114. // compare cached custom pricing parameters with current values
  115. discStr := encodeDiscount(customPricing)
  116. discStrCached := ""
  117. val, found := a.SettingsCache.Get(DiscountSetting)
  118. if !found {
  119. // if no settings are found (e.g. upon first call) cache custom pricing settings but
  120. // return false, as nothing has "changed" per se
  121. a.SettingsCache.Set(DiscountSetting, discStr, cache.NoExpiration)
  122. return false
  123. }
  124. discStrCached, ok := val.(string)
  125. if !ok {
  126. log.Errorf("caching error: failed to cast discount to string")
  127. }
  128. if discStr == discStrCached {
  129. return false
  130. }
  131. // cache new custom pricing settings
  132. a.SettingsCache.Set(DiscountSetting, discStr, cache.DefaultExpiration)
  133. return true
  134. }