2
0

settings.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package costmodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/pkg/cloud/models"
  7. "github.com/patrickmn/go-cache"
  8. )
  9. // InitializeSettingsPubSub sets up the pub/sub mechanisms and kicks of
  10. // routines to detect and publish changes, as well as some routines that
  11. // subscribe and take actions.
  12. func (a *Accesses) InitializeSettingsPubSub() {
  13. a.settingsSubscribers = map[string][]chan string{}
  14. // Publish settings changes
  15. go func(a *Accesses) {
  16. for {
  17. // Publish changes to custom pricing
  18. if a.customPricingHasChanged() {
  19. for _, ch := range a.settingsSubscribers[CustomPricingSetting] {
  20. if data, ok := a.SettingsCache.Get(CustomPricingSetting); ok {
  21. if cpStr, ok := data.(string); ok {
  22. ch <- cpStr
  23. }
  24. }
  25. }
  26. }
  27. // Publish changes to discount
  28. if a.discountHasChanged() {
  29. for _, ch := range a.settingsSubscribers[DiscountSetting] {
  30. if data, ok := a.SettingsCache.Get(DiscountSetting); ok {
  31. if discStr, ok := data.(string); ok {
  32. ch <- discStr
  33. }
  34. }
  35. }
  36. }
  37. time.Sleep(500 * time.Millisecond)
  38. }
  39. }(a)
  40. // Clear caches when custom pricing or discount changes
  41. go func(a *Accesses) {
  42. costDataCacheCh := make(chan string)
  43. a.SubscribeToCustomPricingChanges(costDataCacheCh)
  44. a.SubscribeToDiscountChanges(costDataCacheCh)
  45. for {
  46. msg := <-costDataCacheCh
  47. log.Infof("Flushing cost data caches: %s", msg)
  48. }
  49. }(a)
  50. }
  51. // SubscribeToCustomPricingChanges subscribes the given channel to receive
  52. // custom pricing changes.
  53. func (a *Accesses) SubscribeToCustomPricingChanges(ch chan string) {
  54. a.settingsMutex.Lock()
  55. defer a.settingsMutex.Unlock()
  56. a.settingsSubscribers[CustomPricingSetting] = append(a.settingsSubscribers[CustomPricingSetting], ch)
  57. }
  58. // SubscribeToDiscountChanges subscribes the given channel to receive discount
  59. // changes.
  60. func (a *Accesses) SubscribeToDiscountChanges(ch chan string) {
  61. a.settingsMutex.Lock()
  62. defer a.settingsMutex.Unlock()
  63. a.settingsSubscribers[DiscountSetting] = append(a.settingsSubscribers[DiscountSetting], ch)
  64. }
  65. // customPricingHasChanged returns true if custom pricing settings have changed
  66. // since the last time this function was called.
  67. func (a *Accesses) customPricingHasChanged() bool {
  68. customPricing, err := a.CloudProvider.GetConfig()
  69. if err != nil || customPricing == nil {
  70. log.Errorf("error accessing cloud provider configuration: %s", err)
  71. return false
  72. }
  73. // describe parameters by which we determine whether or not custom
  74. // pricing settings have changed
  75. encodeCustomPricing := func(cp *models.CustomPricing) string {
  76. return fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%s", cp.CustomPricesEnabled, cp.CPU, cp.SpotCPU,
  77. cp.RAM, cp.SpotRAM, cp.GPU, cp.Storage, cp.CurrencyCode)
  78. }
  79. // compare cached custom pricing parameters with current values
  80. cpStr := encodeCustomPricing(customPricing)
  81. cpStrCached := ""
  82. val, found := a.SettingsCache.Get(CustomPricingSetting)
  83. if !found {
  84. // if no settings are found (e.g. upon first call) cache custom pricing settings but
  85. // return false, as nothing has "changed" per se
  86. a.SettingsCache.Set(CustomPricingSetting, cpStr, cache.NoExpiration)
  87. return false
  88. }
  89. cpStrCached, ok := val.(string)
  90. if !ok {
  91. log.Errorf("caching error: failed to cast custom pricing to string")
  92. }
  93. if cpStr == cpStrCached {
  94. return false
  95. }
  96. // cache new custom pricing settings
  97. a.SettingsCache.Set(CustomPricingSetting, cpStr, cache.DefaultExpiration)
  98. return true
  99. }
  100. // discountHasChanged returns true if discount settings have changed
  101. // since the last time this function was called.
  102. func (a *Accesses) discountHasChanged() bool {
  103. customPricing, err := a.CloudProvider.GetConfig()
  104. if err != nil || customPricing == nil {
  105. log.Errorf("error accessing cloud provider configuration: %s", err)
  106. return false
  107. }
  108. // describe parameters by which we determine whether or not custom
  109. // pricing settings have changed
  110. encodeDiscount := func(cp *models.CustomPricing) string {
  111. return fmt.Sprintf("%s:%s", cp.Discount, cp.NegotiatedDiscount)
  112. }
  113. // compare cached custom pricing parameters with current values
  114. discStr := encodeDiscount(customPricing)
  115. discStrCached := ""
  116. val, found := a.SettingsCache.Get(DiscountSetting)
  117. if !found {
  118. // if no settings are found (e.g. upon first call) cache custom pricing settings but
  119. // return false, as nothing has "changed" per se
  120. a.SettingsCache.Set(DiscountSetting, discStr, cache.NoExpiration)
  121. return false
  122. }
  123. discStrCached, ok := val.(string)
  124. if !ok {
  125. log.Errorf("caching error: failed to cast discount to string")
  126. }
  127. if discStr == discStrCached {
  128. return false
  129. }
  130. // cache new custom pricing settings
  131. a.SettingsCache.Set(DiscountSetting, discStr, cache.DefaultExpiration)
  132. return true
  133. }