clustermap.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. package clusters
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/log"
  9. "github.com/kubecost/cost-model/pkg/prom"
  10. "github.com/kubecost/cost-model/pkg/thanos"
  11. prometheus "github.com/prometheus/client_golang/api"
  12. )
  13. const (
  14. LoadRetries int = 6
  15. LoadRetryDelay time.Duration = 10 * time.Second
  16. )
  17. type ClusterInfo struct {
  18. ID string `json:"id"`
  19. Name string `json:"name"`
  20. Profile string `json:"profile"`
  21. Provider string `json:"provider"`
  22. Provisioner string `json:"provisioner"`
  23. }
  24. // Clone creates a copy of ClusterInfo and returns it
  25. func (ci *ClusterInfo) Clone() *ClusterInfo {
  26. if ci == nil {
  27. return nil
  28. }
  29. return &ClusterInfo{
  30. ID: ci.ID,
  31. Name: ci.Name,
  32. Profile: ci.Profile,
  33. Provider: ci.Provider,
  34. Provisioner: ci.Provisioner,
  35. }
  36. }
  37. type ClusterMap interface {
  38. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  39. GetClusterIDs() []string
  40. // AsMap returns the cluster map as a standard go map
  41. AsMap() map[string]*ClusterInfo
  42. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  43. // doesn't exist
  44. InfoFor(clusterID string) *ClusterInfo
  45. // NameFor returns the name of the cluster provided the clusterID.
  46. NameFor(clusterID string) string
  47. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  48. // assigned name. Otherwise, just the clusterID is returned.
  49. NameIDFor(clusterID string) string
  50. // SplitNameID splits the nameID back into a separate id and name field
  51. SplitNameID(nameID string) (id string, name string)
  52. // StopRefresh stops the automatic internal map refresh
  53. StopRefresh()
  54. }
  55. // ClusterMap keeps records of all known cost-model clusters.
  56. type PrometheusClusterMap struct {
  57. lock *sync.RWMutex
  58. client prometheus.Client
  59. clusters map[string]*ClusterInfo
  60. stop chan struct{}
  61. }
  62. // NewClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
  63. func NewClusterMap(client prometheus.Client, refresh time.Duration) ClusterMap {
  64. stop := make(chan struct{})
  65. cm := &PrometheusClusterMap{
  66. lock: new(sync.RWMutex),
  67. client: client,
  68. clusters: make(map[string]*ClusterInfo),
  69. stop: stop,
  70. }
  71. // Run an updater to ensure cluster data stays relevant over time
  72. go func() {
  73. // Immediately Attempt to refresh the clusters
  74. cm.refreshClusters()
  75. // Tick on interval and refresh clusters
  76. ticker := time.NewTicker(refresh)
  77. for {
  78. select {
  79. case <-ticker.C:
  80. cm.refreshClusters()
  81. case <-cm.stop:
  82. log.Infof("ClusterMap refresh stopped.")
  83. return
  84. }
  85. }
  86. }()
  87. return cm
  88. }
  89. // clusterInfoQuery returns the query string to load cluster info
  90. func clusterInfoQuery(offset string) string {
  91. return fmt.Sprintf("kubecost_cluster_info%s", offset)
  92. }
  93. // loadClusters loads all the cluster info to map
  94. func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error) {
  95. var offset string = ""
  96. if prom.IsThanos(pcm.client) {
  97. offset = thanos.QueryOffset()
  98. }
  99. // Execute Query
  100. tryQuery := func() ([]*prom.QueryResult, prometheus.Warnings, error) {
  101. ctx := prom.NewContext(pcm.client)
  102. return ctx.QuerySync(clusterInfoQuery(offset))
  103. }
  104. var qr []*prom.QueryResult
  105. var err error
  106. // Retry on failure
  107. delay := LoadRetryDelay
  108. for r := LoadRetries; r > 0; r-- {
  109. qr, _, err = tryQuery()
  110. // non-error breaks out of loop
  111. if err == nil {
  112. break
  113. }
  114. // wait the delay
  115. time.Sleep(delay)
  116. // add some random backoff
  117. jitter := time.Duration(rand.Int63n(int64(delay)))
  118. delay = delay + jitter/2
  119. }
  120. if err != nil {
  121. return nil, err
  122. }
  123. clusters := make(map[string]*ClusterInfo)
  124. // Load the query results. Critical fields are id and name.
  125. for _, result := range qr {
  126. id, err := result.GetString("id")
  127. if err != nil {
  128. log.Warningf("Failed to load 'id' field for ClusterInfo")
  129. continue
  130. }
  131. name, err := result.GetString("name")
  132. if err != nil {
  133. log.Warningf("Failed to load 'name' field for ClusterInfo")
  134. continue
  135. }
  136. profile, err := result.GetString("clusterprofile")
  137. if err != nil {
  138. profile = ""
  139. }
  140. provider, err := result.GetString("provider")
  141. if err != nil {
  142. provider = ""
  143. }
  144. provisioner, err := result.GetString("provisioner")
  145. if err != nil {
  146. provisioner = ""
  147. }
  148. clusters[id] = &ClusterInfo{
  149. ID: id,
  150. Name: name,
  151. Profile: profile,
  152. Provider: provider,
  153. Provisioner: provisioner,
  154. }
  155. }
  156. return clusters, nil
  157. }
  158. // refreshClusters loads the clusters and updates the internal map
  159. func (pcm *PrometheusClusterMap) refreshClusters() {
  160. updated, err := pcm.loadClusters()
  161. if err != nil {
  162. log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
  163. return
  164. }
  165. pcm.lock.Lock()
  166. pcm.clusters = updated
  167. pcm.lock.Unlock()
  168. }
  169. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  170. func (pcm *PrometheusClusterMap) GetClusterIDs() []string {
  171. pcm.lock.RLock()
  172. defer pcm.lock.RUnlock()
  173. var clusterIDs []string
  174. for id := range pcm.clusters {
  175. clusterIDs = append(clusterIDs, id)
  176. }
  177. return clusterIDs
  178. }
  179. // AsMap returns the cluster map as a standard go map
  180. func (pcm *PrometheusClusterMap) AsMap() map[string]*ClusterInfo {
  181. pcm.lock.RLock()
  182. defer pcm.lock.RUnlock()
  183. m := make(map[string]*ClusterInfo)
  184. for k, v := range pcm.clusters {
  185. m[k] = v.Clone()
  186. }
  187. return m
  188. }
  189. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  190. // doesn't exist
  191. func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *ClusterInfo {
  192. pcm.lock.RLock()
  193. defer pcm.lock.RUnlock()
  194. if info, ok := pcm.clusters[clusterID]; ok {
  195. return info.Clone()
  196. }
  197. return nil
  198. }
  199. // NameFor returns the name of the cluster provided the clusterID.
  200. func (pcm *PrometheusClusterMap) NameFor(clusterID string) string {
  201. pcm.lock.RLock()
  202. defer pcm.lock.RUnlock()
  203. if info, ok := pcm.clusters[clusterID]; ok {
  204. return info.Name
  205. }
  206. return ""
  207. }
  208. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  209. // assigned name. Otherwise, just the clusterID is returned.
  210. func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
  211. pcm.lock.RLock()
  212. defer pcm.lock.RUnlock()
  213. if info, ok := pcm.clusters[clusterID]; ok {
  214. if info.Name == "" {
  215. return clusterID
  216. }
  217. return fmt.Sprintf("%s/%s", info.Name, clusterID)
  218. }
  219. return clusterID
  220. }
  221. func (pcm *PrometheusClusterMap) SplitNameID(nameID string) (id string, name string) {
  222. if !strings.Contains(nameID, "/") {
  223. id = nameID
  224. name = ""
  225. return
  226. }
  227. split := strings.Split(nameID, "/")
  228. name = split[0]
  229. id = split[1]
  230. return
  231. }
  232. // StopRefresh stops the automatic internal map refresh
  233. func (pcm *PrometheusClusterMap) StopRefresh() {
  234. if pcm.stop != nil {
  235. close(pcm.stop)
  236. pcm.stop = nil
  237. }
  238. }