clustermap.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package clusters
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/log"
  9. "github.com/kubecost/cost-model/pkg/prom"
  10. "github.com/kubecost/cost-model/pkg/thanos"
  11. "github.com/kubecost/cost-model/pkg/util/retry"
  12. prometheus "github.com/prometheus/client_golang/api"
  13. )
  14. const (
  15. LoadRetries int = 6
  16. LoadRetryDelay time.Duration = 10 * time.Second
  17. )
  18. type ClusterInfo struct {
  19. ID string `json:"id"`
  20. Name string `json:"name"`
  21. Profile string `json:"profile"`
  22. Provider string `json:"provider"`
  23. Provisioner string `json:"provisioner"`
  24. }
  25. // Clone creates a copy of ClusterInfo and returns it
  26. func (ci *ClusterInfo) Clone() *ClusterInfo {
  27. if ci == nil {
  28. return nil
  29. }
  30. return &ClusterInfo{
  31. ID: ci.ID,
  32. Name: ci.Name,
  33. Profile: ci.Profile,
  34. Provider: ci.Provider,
  35. Provisioner: ci.Provisioner,
  36. }
  37. }
  38. type ClusterMap interface {
  39. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  40. GetClusterIDs() []string
  41. // AsMap returns the cluster map as a standard go map
  42. AsMap() map[string]*ClusterInfo
  43. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  44. // doesn't exist
  45. InfoFor(clusterID string) *ClusterInfo
  46. // NameFor returns the name of the cluster provided the clusterID.
  47. NameFor(clusterID string) string
  48. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  49. // assigned name. Otherwise, just the clusterID is returned.
  50. NameIDFor(clusterID string) string
  51. // SplitNameID splits the nameID back into a separate id and name field
  52. SplitNameID(nameID string) (id string, name string)
  53. // StopRefresh stops the automatic internal map refresh
  54. StopRefresh()
  55. }
  56. // ClusterMap keeps records of all known cost-model clusters.
  57. type PrometheusClusterMap struct {
  58. lock *sync.RWMutex
  59. client prometheus.Client
  60. clusters map[string]*ClusterInfo
  61. stop chan struct{}
  62. }
  63. // NewClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
  64. func NewClusterMap(client prometheus.Client, refresh time.Duration) ClusterMap {
  65. stop := make(chan struct{})
  66. cm := &PrometheusClusterMap{
  67. lock: new(sync.RWMutex),
  68. client: client,
  69. clusters: make(map[string]*ClusterInfo),
  70. stop: stop,
  71. }
  72. // Run an updater to ensure cluster data stays relevant over time
  73. go func() {
  74. // Immediately Attempt to refresh the clusters
  75. cm.refreshClusters()
  76. // Tick on interval and refresh clusters
  77. ticker := time.NewTicker(refresh)
  78. for {
  79. select {
  80. case <-ticker.C:
  81. cm.refreshClusters()
  82. case <-cm.stop:
  83. log.Infof("ClusterMap refresh stopped.")
  84. return
  85. }
  86. }
  87. }()
  88. return cm
  89. }
  90. // clusterInfoQuery returns the query string to load cluster info
  91. func clusterInfoQuery(offset string) string {
  92. return fmt.Sprintf("kubecost_cluster_info%s", offset)
  93. }
  94. // loadClusters loads all the cluster info to map
  95. func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error) {
  96. var offset string = ""
  97. if prom.IsThanos(pcm.client) {
  98. offset = thanos.QueryOffset()
  99. }
  100. // Execute Query
  101. tryQuery := func() (interface{}, error) {
  102. ctx := prom.NewContext(pcm.client)
  103. r, _, e := ctx.QuerySync(clusterInfoQuery(offset))
  104. return r, e
  105. }
  106. // Retry on failure
  107. result, err := retry.Retry(context.Background(), tryQuery, uint(LoadRetries), LoadRetryDelay)
  108. qr, ok := result.([]*prom.QueryResult)
  109. if !ok || err != nil {
  110. return nil, err
  111. }
  112. clusters := make(map[string]*ClusterInfo)
  113. // Load the query results. Critical fields are id and name.
  114. for _, result := range qr {
  115. id, err := result.GetString("id")
  116. if err != nil {
  117. log.Warningf("Failed to load 'id' field for ClusterInfo")
  118. continue
  119. }
  120. name, err := result.GetString("name")
  121. if err != nil {
  122. log.Warningf("Failed to load 'name' field for ClusterInfo")
  123. continue
  124. }
  125. profile, err := result.GetString("clusterprofile")
  126. if err != nil {
  127. profile = ""
  128. }
  129. provider, err := result.GetString("provider")
  130. if err != nil {
  131. provider = ""
  132. }
  133. provisioner, err := result.GetString("provisioner")
  134. if err != nil {
  135. provisioner = ""
  136. }
  137. clusters[id] = &ClusterInfo{
  138. ID: id,
  139. Name: name,
  140. Profile: profile,
  141. Provider: provider,
  142. Provisioner: provisioner,
  143. }
  144. }
  145. return clusters, nil
  146. }
  147. // refreshClusters loads the clusters and updates the internal map
  148. func (pcm *PrometheusClusterMap) refreshClusters() {
  149. updated, err := pcm.loadClusters()
  150. if err != nil {
  151. log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
  152. return
  153. }
  154. pcm.lock.Lock()
  155. pcm.clusters = updated
  156. pcm.lock.Unlock()
  157. }
  158. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  159. func (pcm *PrometheusClusterMap) GetClusterIDs() []string {
  160. pcm.lock.RLock()
  161. defer pcm.lock.RUnlock()
  162. var clusterIDs []string
  163. for id := range pcm.clusters {
  164. clusterIDs = append(clusterIDs, id)
  165. }
  166. return clusterIDs
  167. }
  168. // AsMap returns the cluster map as a standard go map
  169. func (pcm *PrometheusClusterMap) AsMap() map[string]*ClusterInfo {
  170. pcm.lock.RLock()
  171. defer pcm.lock.RUnlock()
  172. m := make(map[string]*ClusterInfo)
  173. for k, v := range pcm.clusters {
  174. m[k] = v.Clone()
  175. }
  176. return m
  177. }
  178. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  179. // doesn't exist
  180. func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *ClusterInfo {
  181. pcm.lock.RLock()
  182. defer pcm.lock.RUnlock()
  183. if info, ok := pcm.clusters[clusterID]; ok {
  184. return info.Clone()
  185. }
  186. return nil
  187. }
  188. // NameFor returns the name of the cluster provided the clusterID.
  189. func (pcm *PrometheusClusterMap) NameFor(clusterID string) string {
  190. pcm.lock.RLock()
  191. defer pcm.lock.RUnlock()
  192. if info, ok := pcm.clusters[clusterID]; ok {
  193. return info.Name
  194. }
  195. return ""
  196. }
  197. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  198. // assigned name. Otherwise, just the clusterID is returned.
  199. func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
  200. pcm.lock.RLock()
  201. defer pcm.lock.RUnlock()
  202. if info, ok := pcm.clusters[clusterID]; ok {
  203. if info.Name == "" {
  204. return clusterID
  205. }
  206. return fmt.Sprintf("%s/%s", info.Name, clusterID)
  207. }
  208. return clusterID
  209. }
  210. func (pcm *PrometheusClusterMap) SplitNameID(nameID string) (id string, name string) {
  211. if !strings.Contains(nameID, "/") {
  212. id = nameID
  213. name = ""
  214. return
  215. }
  216. split := strings.Split(nameID, "/")
  217. name = split[0]
  218. id = split[1]
  219. return
  220. }
  221. // StopRefresh stops the automatic internal map refresh
  222. func (pcm *PrometheusClusterMap) StopRefresh() {
  223. if pcm.stop != nil {
  224. close(pcm.stop)
  225. pcm.stop = nil
  226. }
  227. }