clustermap.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. package clusters
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/log"
  9. "github.com/kubecost/cost-model/pkg/prom"
  10. "github.com/kubecost/cost-model/pkg/thanos"
  11. "github.com/kubecost/cost-model/pkg/util/retry"
  12. prometheus "github.com/prometheus/client_golang/api"
  13. )
  14. const (
  15. LoadRetries int = 6
  16. LoadRetryDelay time.Duration = 10 * time.Second
  17. )
  18. type ClusterInfo struct {
  19. ID string `json:"id"`
  20. Name string `json:"name"`
  21. Profile string `json:"profile"`
  22. Provider string `json:"provider"`
  23. Provisioner string `json:"provisioner"`
  24. }
  25. // Clone creates a copy of ClusterInfo and returns it
  26. func (ci *ClusterInfo) Clone() *ClusterInfo {
  27. if ci == nil {
  28. return nil
  29. }
  30. return &ClusterInfo{
  31. ID: ci.ID,
  32. Name: ci.Name,
  33. Profile: ci.Profile,
  34. Provider: ci.Provider,
  35. Provisioner: ci.Provisioner,
  36. }
  37. }
  38. type ClusterMap interface {
  39. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  40. GetClusterIDs() []string
  41. // AsMap returns the cluster map as a standard go map
  42. AsMap() map[string]*ClusterInfo
  43. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  44. // doesn't exist
  45. InfoFor(clusterID string) *ClusterInfo
  46. // NameFor returns the name of the cluster provided the clusterID.
  47. NameFor(clusterID string) string
  48. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  49. // assigned name. Otherwise, just the clusterID is returned.
  50. NameIDFor(clusterID string) string
  51. // SplitNameID splits the nameID back into a separate id and name field
  52. SplitNameID(nameID string) (id string, name string)
  53. // StopRefresh stops the automatic internal map refresh
  54. StopRefresh()
  55. }
  56. // ClusterInfoProvider is a contract which is capable of performing cluster info lookups.
  57. type ClusterInfoProvider interface {
  58. // GetClusterInfo returns a string map containing the local/remote connected cluster info
  59. GetClusterInfo() map[string]string
  60. }
  61. // ClusterMap keeps records of all known cost-model clusters.
  62. type PrometheusClusterMap struct {
  63. lock *sync.RWMutex
  64. client prometheus.Client
  65. clusters map[string]*ClusterInfo
  66. clusterInfo ClusterInfoProvider
  67. stop chan struct{}
  68. }
  69. // NewClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
  70. func NewClusterMap(client prometheus.Client, cip ClusterInfoProvider, refresh time.Duration) ClusterMap {
  71. stop := make(chan struct{})
  72. cm := &PrometheusClusterMap{
  73. lock: new(sync.RWMutex),
  74. client: client,
  75. clusters: make(map[string]*ClusterInfo),
  76. clusterInfo: cip,
  77. stop: stop,
  78. }
  79. // Run an updater to ensure cluster data stays relevant over time
  80. go func() {
  81. // Immediately Attempt to refresh the clusters
  82. cm.refreshClusters()
  83. // Tick on interval and refresh clusters
  84. ticker := time.NewTicker(refresh)
  85. for {
  86. select {
  87. case <-ticker.C:
  88. cm.refreshClusters()
  89. case <-cm.stop:
  90. log.Infof("ClusterMap refresh stopped.")
  91. return
  92. }
  93. }
  94. }()
  95. return cm
  96. }
  97. // clusterInfoQuery returns the query string to load cluster info
  98. func clusterInfoQuery(offset string) string {
  99. return fmt.Sprintf("kubecost_cluster_info%s", offset)
  100. }
  101. // loadClusters loads all the cluster info to map
  102. func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error) {
  103. var offset string = ""
  104. if prom.IsThanos(pcm.client) {
  105. offset = thanos.QueryOffset()
  106. }
  107. // Execute Query
  108. tryQuery := func() (interface{}, error) {
  109. ctx := prom.NewNamedContext(pcm.client, prom.ClusterMapContextName)
  110. r, _, e := ctx.QuerySync(clusterInfoQuery(offset))
  111. return r, e
  112. }
  113. // Retry on failure
  114. result, err := retry.Retry(context.Background(), tryQuery, uint(LoadRetries), LoadRetryDelay)
  115. qr, ok := result.([]*prom.QueryResult)
  116. if !ok || err != nil {
  117. return nil, err
  118. }
  119. clusters := make(map[string]*ClusterInfo)
  120. // Load the query results. Critical fields are id and name.
  121. for _, result := range qr {
  122. id, err := result.GetString("id")
  123. if err != nil {
  124. log.Warningf("Failed to load 'id' field for ClusterInfo")
  125. continue
  126. }
  127. name, err := result.GetString("name")
  128. if err != nil {
  129. log.Warningf("Failed to load 'name' field for ClusterInfo")
  130. continue
  131. }
  132. profile, err := result.GetString("clusterprofile")
  133. if err != nil {
  134. profile = ""
  135. }
  136. provider, err := result.GetString("provider")
  137. if err != nil {
  138. provider = ""
  139. }
  140. provisioner, err := result.GetString("provisioner")
  141. if err != nil {
  142. provisioner = ""
  143. }
  144. clusters[id] = &ClusterInfo{
  145. ID: id,
  146. Name: name,
  147. Profile: profile,
  148. Provider: provider,
  149. Provisioner: provisioner,
  150. }
  151. }
  152. // populate the local cluster if it doesn't exist
  153. localInfo, err := pcm.getLocalClusterInfo()
  154. if err != nil {
  155. return clusters, nil
  156. }
  157. // Check to see if the local cluster's id is part of our loaded clusters, and include if not
  158. if _, ok := clusters[localInfo.ID]; !ok {
  159. clusters[localInfo.ID] = localInfo
  160. }
  161. return clusters, nil
  162. }
  163. // getLocalClusterInfo returns the local cluster info in the event there does not exist a metric available.
  164. func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*ClusterInfo, error) {
  165. info := pcm.clusterInfo.GetClusterInfo()
  166. var id string
  167. var name string
  168. if i, ok := info["id"]; ok {
  169. id = i
  170. } else {
  171. return nil, fmt.Errorf("Local Cluster Info Missing ID")
  172. }
  173. if n, ok := info["name"]; ok {
  174. name = n
  175. } else {
  176. return nil, fmt.Errorf("Local Cluster Info Missing Name")
  177. }
  178. var clusterProfile string
  179. var provider string
  180. var provisioner string
  181. if cp, ok := info["clusterProfile"]; ok {
  182. clusterProfile = cp
  183. }
  184. if pvdr, ok := info["provider"]; ok {
  185. provider = pvdr
  186. }
  187. if pvsr, ok := info["provisioner"]; ok {
  188. provisioner = pvsr
  189. }
  190. return &ClusterInfo{
  191. ID: id,
  192. Name: name,
  193. Profile: clusterProfile,
  194. Provider: provider,
  195. Provisioner: provisioner,
  196. }, nil
  197. }
  198. // refreshClusters loads the clusters and updates the internal map
  199. func (pcm *PrometheusClusterMap) refreshClusters() {
  200. updated, err := pcm.loadClusters()
  201. if err != nil {
  202. log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
  203. return
  204. }
  205. pcm.lock.Lock()
  206. pcm.clusters = updated
  207. pcm.lock.Unlock()
  208. }
  209. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  210. func (pcm *PrometheusClusterMap) GetClusterIDs() []string {
  211. pcm.lock.RLock()
  212. defer pcm.lock.RUnlock()
  213. var clusterIDs []string
  214. for id := range pcm.clusters {
  215. clusterIDs = append(clusterIDs, id)
  216. }
  217. return clusterIDs
  218. }
  219. // AsMap returns the cluster map as a standard go map
  220. func (pcm *PrometheusClusterMap) AsMap() map[string]*ClusterInfo {
  221. pcm.lock.RLock()
  222. defer pcm.lock.RUnlock()
  223. m := make(map[string]*ClusterInfo)
  224. for k, v := range pcm.clusters {
  225. m[k] = v.Clone()
  226. }
  227. return m
  228. }
  229. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  230. // doesn't exist
  231. func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *ClusterInfo {
  232. pcm.lock.RLock()
  233. defer pcm.lock.RUnlock()
  234. if info, ok := pcm.clusters[clusterID]; ok {
  235. return info.Clone()
  236. }
  237. return nil
  238. }
  239. // NameFor returns the name of the cluster provided the clusterID.
  240. func (pcm *PrometheusClusterMap) NameFor(clusterID string) string {
  241. pcm.lock.RLock()
  242. defer pcm.lock.RUnlock()
  243. if info, ok := pcm.clusters[clusterID]; ok {
  244. return info.Name
  245. }
  246. return ""
  247. }
  248. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  249. // assigned name. Otherwise, just the clusterID is returned.
  250. func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
  251. pcm.lock.RLock()
  252. defer pcm.lock.RUnlock()
  253. if info, ok := pcm.clusters[clusterID]; ok {
  254. if info.Name == "" {
  255. return clusterID
  256. }
  257. return fmt.Sprintf("%s/%s", info.Name, clusterID)
  258. }
  259. return clusterID
  260. }
  261. func (pcm *PrometheusClusterMap) SplitNameID(nameID string) (id string, name string) {
  262. if !strings.Contains(nameID, "/") {
  263. id = nameID
  264. name = ""
  265. return
  266. }
  267. split := strings.Split(nameID, "/")
  268. name = split[0]
  269. id = split[1]
  270. return
  271. }
  272. // StopRefresh stops the automatic internal map refresh
  273. func (pcm *PrometheusClusterMap) StopRefresh() {
  274. if pcm.stop != nil {
  275. close(pcm.stop)
  276. pcm.stop = nil
  277. }
  278. }