settings.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package costmodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/kubecost/cost-model/pkg/cloud"
  6. "github.com/kubecost/cost-model/pkg/log"
  7. "github.com/patrickmn/go-cache"
  8. "k8s.io/klog"
  9. )
  10. // InitializeSettingsPubSub sets up the pub/sub mechanisms and kicks of
  11. // routines to detect and publish changes, as well as some routines that
  12. // subscribe and take actions.
  13. func (a *Accesses) InitializeSettingsPubSub() {
  14. a.settingsSubscribers = map[string][]chan string{}
  15. // Publish settings changes
  16. go func(a *Accesses) {
  17. for {
  18. // Publish changes to custom pricing
  19. if a.customPricingHasChanged() {
  20. for _, ch := range a.settingsSubscribers[CustomPricingSetting] {
  21. if data, ok := a.SettingsCache.Get(CustomPricingSetting); ok {
  22. if cpStr, ok := data.(string); ok {
  23. ch <- cpStr
  24. }
  25. }
  26. }
  27. }
  28. // Publish changes to discount
  29. if a.discountHasChanged() {
  30. for _, ch := range a.settingsSubscribers[DiscountSetting] {
  31. if data, ok := a.SettingsCache.Get(DiscountSetting); ok {
  32. if discStr, ok := data.(string); ok {
  33. ch <- discStr
  34. }
  35. }
  36. }
  37. }
  38. time.Sleep(500 * time.Millisecond)
  39. }
  40. }(a)
  41. // Clear caches when custom pricing or discount changes
  42. go func(a *Accesses) {
  43. costDataCacheCh := make(chan string)
  44. a.SubscribeToCustomPricingChanges(costDataCacheCh)
  45. a.SubscribeToDiscountChanges(costDataCacheCh)
  46. for {
  47. msg := <-costDataCacheCh
  48. log.Infof("Flushing cost data caches: %s", msg)
  49. a.AggregateCache.Flush()
  50. a.CostDataCache.Flush()
  51. }
  52. }(a)
  53. }
  54. // SubscribeToCustomPricingChanges subscribes the given channel to receive
  55. // custom pricing changes.
  56. func (a *Accesses) SubscribeToCustomPricingChanges(ch chan string) {
  57. a.settingsMutex.Lock()
  58. defer a.settingsMutex.Unlock()
  59. a.settingsSubscribers[CustomPricingSetting] = append(a.settingsSubscribers[CustomPricingSetting], ch)
  60. }
  61. // SubscribeToDiscountChanges subscribes the given channel to receive discount
  62. // changes.
  63. func (a *Accesses) SubscribeToDiscountChanges(ch chan string) {
  64. a.settingsMutex.Lock()
  65. defer a.settingsMutex.Unlock()
  66. a.settingsSubscribers[DiscountSetting] = append(a.settingsSubscribers[DiscountSetting], ch)
  67. }
  68. // customPricingHasChanged returns true if custom pricing settings have changed
  69. // since the last time this function was called.
  70. func (a *Accesses) customPricingHasChanged() bool {
  71. customPricing, err := a.CloudProvider.GetConfig()
  72. if err != nil || customPricing == nil {
  73. klog.Errorf("error accessing cloud provider configuration: %s", err)
  74. return false
  75. }
  76. // describe parameters by which we determine whether or not custom
  77. // pricing settings have changed
  78. encodeCustomPricing := func(cp *cloud.CustomPricing) string {
  79. return fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%s:%s", cp.CustomPricesEnabled, cp.CPU, cp.SpotCPU,
  80. cp.RAM, cp.SpotRAM, cp.GPU, cp.Storage, cp.CurrencyCode, cp.SharedOverhead)
  81. }
  82. // compare cached custom pricing parameters with current values
  83. cpStr := encodeCustomPricing(customPricing)
  84. cpStrCached := ""
  85. val, found := a.SettingsCache.Get(CustomPricingSetting)
  86. if !found {
  87. // if no settings are found (e.g. upon first call) cache custom pricing settings but
  88. // return false, as nothing has "changed" per se
  89. a.SettingsCache.Set(CustomPricingSetting, cpStr, cache.NoExpiration)
  90. return false
  91. }
  92. cpStrCached, ok := val.(string)
  93. if !ok {
  94. klog.Errorf("caching error: failed to cast custom pricing to string")
  95. }
  96. if cpStr == cpStrCached {
  97. return false
  98. }
  99. // cache new custom pricing settings
  100. a.SettingsCache.Set(CustomPricingSetting, cpStr, cache.DefaultExpiration)
  101. return true
  102. }
  103. // discountHasChanged returns true if discount settings have changed
  104. // since the last time this function was called.
  105. func (a *Accesses) discountHasChanged() bool {
  106. customPricing, err := a.CloudProvider.GetConfig()
  107. if err != nil || customPricing == nil {
  108. klog.Errorf("error accessing cloud provider configuration: %s", err)
  109. return false
  110. }
  111. // describe parameters by which we determine whether or not custom
  112. // pricing settings have changed
  113. encodeDiscount := func(cp *cloud.CustomPricing) string {
  114. return fmt.Sprintf("%s:%s", cp.Discount, cp.NegotiatedDiscount)
  115. }
  116. // compare cached custom pricing parameters with current values
  117. discStr := encodeDiscount(customPricing)
  118. discStrCached := ""
  119. val, found := a.SettingsCache.Get(DiscountSetting)
  120. if !found {
  121. // if no settings are found (e.g. upon first call) cache custom pricing settings but
  122. // return false, as nothing has "changed" per se
  123. a.SettingsCache.Set(DiscountSetting, discStr, cache.NoExpiration)
  124. return false
  125. }
  126. discStrCached, ok := val.(string)
  127. if !ok {
  128. klog.Errorf("caching error: failed to cast discount to string")
  129. }
  130. if discStr == discStrCached {
  131. return false
  132. }
  133. // cache new custom pricing settings
  134. a.SettingsCache.Set(DiscountSetting, discStr, cache.DefaultExpiration)
  135. return true
  136. }