settings.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package costmodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/pkg/cloud"
  6. "github.com/opencost/opencost/pkg/log"
  7. "github.com/patrickmn/go-cache"
  8. )
  9. // InitializeSettingsPubSub sets up the pub/sub mechanisms and kicks of
  10. // routines to detect and publish changes, as well as some routines that
  11. // subscribe and take actions.
  12. func (a *Accesses) InitializeSettingsPubSub() {
  13. a.settingsSubscribers = map[string][]chan string{}
  14. // Publish settings changes
  15. go func(a *Accesses) {
  16. for {
  17. // Publish changes to custom pricing
  18. if a.customPricingHasChanged() {
  19. for _, ch := range a.settingsSubscribers[CustomPricingSetting] {
  20. if data, ok := a.SettingsCache.Get(CustomPricingSetting); ok {
  21. if cpStr, ok := data.(string); ok {
  22. ch <- cpStr
  23. }
  24. }
  25. }
  26. }
  27. // Publish changes to discount
  28. if a.discountHasChanged() {
  29. for _, ch := range a.settingsSubscribers[DiscountSetting] {
  30. if data, ok := a.SettingsCache.Get(DiscountSetting); ok {
  31. if discStr, ok := data.(string); ok {
  32. ch <- discStr
  33. }
  34. }
  35. }
  36. }
  37. time.Sleep(500 * time.Millisecond)
  38. }
  39. }(a)
  40. // Clear caches when custom pricing or discount changes
  41. go func(a *Accesses) {
  42. costDataCacheCh := make(chan string)
  43. a.SubscribeToCustomPricingChanges(costDataCacheCh)
  44. a.SubscribeToDiscountChanges(costDataCacheCh)
  45. for {
  46. msg := <-costDataCacheCh
  47. log.Infof("Flushing cost data caches: %s", msg)
  48. a.AggregateCache.Flush()
  49. a.CostDataCache.Flush()
  50. }
  51. }(a)
  52. }
  53. // SubscribeToCustomPricingChanges subscribes the given channel to receive
  54. // custom pricing changes.
  55. func (a *Accesses) SubscribeToCustomPricingChanges(ch chan string) {
  56. a.settingsMutex.Lock()
  57. defer a.settingsMutex.Unlock()
  58. a.settingsSubscribers[CustomPricingSetting] = append(a.settingsSubscribers[CustomPricingSetting], ch)
  59. }
  60. // SubscribeToDiscountChanges subscribes the given channel to receive discount
  61. // changes.
  62. func (a *Accesses) SubscribeToDiscountChanges(ch chan string) {
  63. a.settingsMutex.Lock()
  64. defer a.settingsMutex.Unlock()
  65. a.settingsSubscribers[DiscountSetting] = append(a.settingsSubscribers[DiscountSetting], ch)
  66. }
  67. // customPricingHasChanged returns true if custom pricing settings have changed
  68. // since the last time this function was called.
  69. func (a *Accesses) customPricingHasChanged() bool {
  70. customPricing, err := a.CloudProvider.GetConfig()
  71. if err != nil || customPricing == nil {
  72. log.Errorf("error accessing cloud provider configuration: %s", err)
  73. return false
  74. }
  75. // describe parameters by which we determine whether or not custom
  76. // pricing settings have changed
  77. encodeCustomPricing := func(cp *cloud.CustomPricing) string {
  78. return fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%s:%s", cp.CustomPricesEnabled, cp.CPU, cp.SpotCPU,
  79. cp.RAM, cp.SpotRAM, cp.GPU, cp.Storage, cp.CurrencyCode, cp.SharedOverhead)
  80. }
  81. // compare cached custom pricing parameters with current values
  82. cpStr := encodeCustomPricing(customPricing)
  83. cpStrCached := ""
  84. val, found := a.SettingsCache.Get(CustomPricingSetting)
  85. if !found {
  86. // if no settings are found (e.g. upon first call) cache custom pricing settings but
  87. // return false, as nothing has "changed" per se
  88. a.SettingsCache.Set(CustomPricingSetting, cpStr, cache.NoExpiration)
  89. return false
  90. }
  91. cpStrCached, ok := val.(string)
  92. if !ok {
  93. log.Errorf("caching error: failed to cast custom pricing to string")
  94. }
  95. if cpStr == cpStrCached {
  96. return false
  97. }
  98. // cache new custom pricing settings
  99. a.SettingsCache.Set(CustomPricingSetting, cpStr, cache.DefaultExpiration)
  100. return true
  101. }
  102. // discountHasChanged returns true if discount settings have changed
  103. // since the last time this function was called.
  104. func (a *Accesses) discountHasChanged() bool {
  105. customPricing, err := a.CloudProvider.GetConfig()
  106. if err != nil || customPricing == nil {
  107. log.Errorf("error accessing cloud provider configuration: %s", err)
  108. return false
  109. }
  110. // describe parameters by which we determine whether or not custom
  111. // pricing settings have changed
  112. encodeDiscount := func(cp *cloud.CustomPricing) string {
  113. return fmt.Sprintf("%s:%s", cp.Discount, cp.NegotiatedDiscount)
  114. }
  115. // compare cached custom pricing parameters with current values
  116. discStr := encodeDiscount(customPricing)
  117. discStrCached := ""
  118. val, found := a.SettingsCache.Get(DiscountSetting)
  119. if !found {
  120. // if no settings are found (e.g. upon first call) cache custom pricing settings but
  121. // return false, as nothing has "changed" per se
  122. a.SettingsCache.Set(DiscountSetting, discStr, cache.NoExpiration)
  123. return false
  124. }
  125. discStrCached, ok := val.(string)
  126. if !ok {
  127. log.Errorf("caching error: failed to cast discount to string")
  128. }
  129. if discStr == discStrCached {
  130. return false
  131. }
  132. // cache new custom pricing settings
  133. a.SettingsCache.Set(DiscountSetting, discStr, cache.DefaultExpiration)
  134. return true
  135. }