clustermap.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. package clusters
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/env"
  9. "github.com/kubecost/cost-model/pkg/log"
  10. "github.com/kubecost/cost-model/pkg/prom"
  11. "github.com/kubecost/cost-model/pkg/thanos"
  12. "github.com/kubecost/cost-model/pkg/util/retry"
  13. prometheus "github.com/prometheus/client_golang/api"
  14. )
  15. const (
  16. LoadRetries int = 6
  17. LoadRetryDelay time.Duration = 10 * time.Second
  18. )
  19. type ClusterInfo struct {
  20. ID string `json:"id"`
  21. Name string `json:"name"`
  22. Profile string `json:"profile"`
  23. Provider string `json:"provider"`
  24. Provisioner string `json:"provisioner"`
  25. }
  26. // Clone creates a copy of ClusterInfo and returns it
  27. func (ci *ClusterInfo) Clone() *ClusterInfo {
  28. if ci == nil {
  29. return nil
  30. }
  31. return &ClusterInfo{
  32. ID: ci.ID,
  33. Name: ci.Name,
  34. Profile: ci.Profile,
  35. Provider: ci.Provider,
  36. Provisioner: ci.Provisioner,
  37. }
  38. }
  39. type ClusterMap interface {
  40. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  41. GetClusterIDs() []string
  42. // AsMap returns the cluster map as a standard go map
  43. AsMap() map[string]*ClusterInfo
  44. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  45. // doesn't exist
  46. InfoFor(clusterID string) *ClusterInfo
  47. // NameFor returns the name of the cluster provided the clusterID.
  48. NameFor(clusterID string) string
  49. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  50. // assigned name. Otherwise, just the clusterID is returned.
  51. NameIDFor(clusterID string) string
  52. // SplitNameID splits the nameID back into a separate id and name field
  53. SplitNameID(nameID string) (id string, name string)
  54. // StopRefresh stops the automatic internal map refresh
  55. StopRefresh()
  56. }
  57. // LocalClusterInfoProvider is a contract which is capable of performing local cluster info lookups.
  58. type LocalClusterInfoProvider interface {
  59. // GetClusterInfo returns a string map containing the local cluster info
  60. GetClusterInfo() map[string]string
  61. }
  62. // ClusterMap keeps records of all known cost-model clusters.
  63. type PrometheusClusterMap struct {
  64. lock *sync.RWMutex
  65. client prometheus.Client
  66. clusters map[string]*ClusterInfo
  67. localCluster LocalClusterInfoProvider
  68. stop chan struct{}
  69. }
  70. // NewClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
  71. func NewClusterMap(client prometheus.Client, lcip LocalClusterInfoProvider, refresh time.Duration) ClusterMap {
  72. stop := make(chan struct{})
  73. cm := &PrometheusClusterMap{
  74. lock: new(sync.RWMutex),
  75. client: client,
  76. clusters: make(map[string]*ClusterInfo),
  77. localCluster: lcip,
  78. stop: stop,
  79. }
  80. // Run an updater to ensure cluster data stays relevant over time
  81. go func() {
  82. // Immediately Attempt to refresh the clusters
  83. cm.refreshClusters()
  84. // Tick on interval and refresh clusters
  85. ticker := time.NewTicker(refresh)
  86. for {
  87. select {
  88. case <-ticker.C:
  89. cm.refreshClusters()
  90. case <-cm.stop:
  91. log.Infof("ClusterMap refresh stopped.")
  92. return
  93. }
  94. }
  95. }()
  96. return cm
  97. }
  98. // clusterInfoQuery returns the query string to load cluster info
  99. func clusterInfoQuery(offset string) string {
  100. return fmt.Sprintf("kubecost_cluster_info%s", offset)
  101. }
  102. // loadClusters loads all the cluster info to map
  103. func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error) {
  104. var offset string = ""
  105. if prom.IsThanos(pcm.client) {
  106. offset = thanos.QueryOffset()
  107. }
  108. // Execute Query
  109. tryQuery := func() (interface{}, error) {
  110. ctx := prom.NewNamedContext(pcm.client, prom.ClusterMapContextName)
  111. r, _, e := ctx.QuerySync(clusterInfoQuery(offset))
  112. return r, e
  113. }
  114. // Retry on failure
  115. result, err := retry.Retry(context.Background(), tryQuery, uint(LoadRetries), LoadRetryDelay)
  116. qr, ok := result.([]*prom.QueryResult)
  117. if !ok || err != nil {
  118. return nil, err
  119. }
  120. clusters := make(map[string]*ClusterInfo)
  121. // Load the query results. Critical fields are id and name.
  122. for _, result := range qr {
  123. id, err := result.GetString("id")
  124. if err != nil {
  125. log.Warningf("Failed to load 'id' field for ClusterInfo")
  126. continue
  127. }
  128. name, err := result.GetString("name")
  129. if err != nil {
  130. log.Warningf("Failed to load 'name' field for ClusterInfo")
  131. continue
  132. }
  133. profile, err := result.GetString("clusterprofile")
  134. if err != nil {
  135. profile = ""
  136. }
  137. provider, err := result.GetString("provider")
  138. if err != nil {
  139. provider = ""
  140. }
  141. provisioner, err := result.GetString("provisioner")
  142. if err != nil {
  143. provisioner = ""
  144. }
  145. clusters[id] = &ClusterInfo{
  146. ID: id,
  147. Name: name,
  148. Profile: profile,
  149. Provider: provider,
  150. Provisioner: provisioner,
  151. }
  152. }
  153. // populate the local cluster if it doesn't exist
  154. localID := env.GetClusterID()
  155. if _, ok := clusters[localID]; !ok {
  156. localInfo, err := pcm.getLocalClusterInfo()
  157. if err != nil {
  158. log.Warningf("Failed to load local cluster info: %s", err)
  159. } else {
  160. clusters[localInfo.ID] = localInfo
  161. }
  162. }
  163. return clusters, nil
  164. }
  165. // getLocalClusterInfo returns the local cluster info in the event there does not exist a metric available.
  166. func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*ClusterInfo, error) {
  167. info := pcm.localCluster.GetClusterInfo()
  168. var id string
  169. var name string
  170. if i, ok := info["id"]; ok {
  171. id = i
  172. } else {
  173. return nil, fmt.Errorf("Local Cluster Info Missing ID")
  174. }
  175. if n, ok := info["name"]; ok {
  176. name = n
  177. } else {
  178. return nil, fmt.Errorf("Local Cluster Info Missing Name")
  179. }
  180. var clusterProfile string
  181. var provider string
  182. var provisioner string
  183. if cp, ok := info["clusterProfile"]; ok {
  184. clusterProfile = cp
  185. }
  186. if pvdr, ok := info["provider"]; ok {
  187. provider = pvdr
  188. }
  189. if pvsr, ok := info["provisioner"]; ok {
  190. provisioner = pvsr
  191. }
  192. return &ClusterInfo{
  193. ID: id,
  194. Name: name,
  195. Profile: clusterProfile,
  196. Provider: provider,
  197. Provisioner: provisioner,
  198. }, nil
  199. }
  200. // refreshClusters loads the clusters and updates the internal map
  201. func (pcm *PrometheusClusterMap) refreshClusters() {
  202. updated, err := pcm.loadClusters()
  203. if err != nil {
  204. log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
  205. return
  206. }
  207. pcm.lock.Lock()
  208. pcm.clusters = updated
  209. pcm.lock.Unlock()
  210. }
  211. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  212. func (pcm *PrometheusClusterMap) GetClusterIDs() []string {
  213. pcm.lock.RLock()
  214. defer pcm.lock.RUnlock()
  215. var clusterIDs []string
  216. for id := range pcm.clusters {
  217. clusterIDs = append(clusterIDs, id)
  218. }
  219. return clusterIDs
  220. }
  221. // AsMap returns the cluster map as a standard go map
  222. func (pcm *PrometheusClusterMap) AsMap() map[string]*ClusterInfo {
  223. pcm.lock.RLock()
  224. defer pcm.lock.RUnlock()
  225. m := make(map[string]*ClusterInfo)
  226. for k, v := range pcm.clusters {
  227. m[k] = v.Clone()
  228. }
  229. return m
  230. }
  231. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  232. // doesn't exist
  233. func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *ClusterInfo {
  234. pcm.lock.RLock()
  235. defer pcm.lock.RUnlock()
  236. if info, ok := pcm.clusters[clusterID]; ok {
  237. return info.Clone()
  238. }
  239. return nil
  240. }
  241. // NameFor returns the name of the cluster provided the clusterID.
  242. func (pcm *PrometheusClusterMap) NameFor(clusterID string) string {
  243. pcm.lock.RLock()
  244. defer pcm.lock.RUnlock()
  245. if info, ok := pcm.clusters[clusterID]; ok {
  246. return info.Name
  247. }
  248. return ""
  249. }
  250. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  251. // assigned name. Otherwise, just the clusterID is returned.
  252. func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
  253. pcm.lock.RLock()
  254. defer pcm.lock.RUnlock()
  255. if info, ok := pcm.clusters[clusterID]; ok {
  256. if info.Name == "" {
  257. return clusterID
  258. }
  259. return fmt.Sprintf("%s/%s", info.Name, clusterID)
  260. }
  261. return clusterID
  262. }
  263. func (pcm *PrometheusClusterMap) SplitNameID(nameID string) (id string, name string) {
  264. if !strings.Contains(nameID, "/") {
  265. id = nameID
  266. name = ""
  267. return
  268. }
  269. split := strings.Split(nameID, "/")
  270. name = split[0]
  271. id = split[1]
  272. return
  273. }
  274. // StopRefresh stops the automatic internal map refresh
  275. func (pcm *PrometheusClusterMap) StopRefresh() {
  276. if pcm.stop != nil {
  277. close(pcm.stop)
  278. pcm.stop = nil
  279. }
  280. }