clustermap.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. package clusters
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/log"
  9. "github.com/kubecost/cost-model/pkg/prom"
  10. "github.com/kubecost/cost-model/pkg/thanos"
  11. "github.com/kubecost/cost-model/pkg/util/retry"
  12. prometheus "github.com/prometheus/client_golang/api"
  13. )
  14. const (
  15. LoadRetries int = 6
  16. LoadRetryDelay time.Duration = 10 * time.Second
  17. )
  18. // ClusterInfo holds attributes of Cluster from metrics pulled from Prometheus
  19. type ClusterInfo struct {
  20. ID string `json:"id"`
  21. Name string `json:"name"`
  22. Profile string `json:"profile"`
  23. Provider string `json:"provider"`
  24. Account string `json:"account"`
  25. Project string `json:"project"`
  26. Region string `json:"region"`
  27. Provisioner string `json:"provisioner"`
  28. }
  29. // Clone creates a copy of ClusterInfo and returns it
  30. func (ci *ClusterInfo) Clone() *ClusterInfo {
  31. if ci == nil {
  32. return nil
  33. }
  34. return &ClusterInfo{
  35. ID: ci.ID,
  36. Name: ci.Name,
  37. Profile: ci.Profile,
  38. Provider: ci.Provider,
  39. Account: ci.Account,
  40. Project: ci.Project,
  41. Region: ci.Region,
  42. Provisioner: ci.Provisioner,
  43. }
  44. }
  45. type ClusterMap interface {
  46. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  47. GetClusterIDs() []string
  48. // AsMap returns the cluster map as a standard go map
  49. AsMap() map[string]*ClusterInfo
  50. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  51. // doesn't exist
  52. InfoFor(clusterID string) *ClusterInfo
  53. // NameFor returns the name of the cluster provided the clusterID.
  54. NameFor(clusterID string) string
  55. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  56. // assigned name. Otherwise, just the clusterID is returned.
  57. NameIDFor(clusterID string) string
  58. // SplitNameID splits the nameID back into a separate id and name field
  59. SplitNameID(nameID string) (id string, name string)
  60. // StopRefresh stops the automatic internal map refresh
  61. StopRefresh()
  62. }
  63. // ClusterInfoProvider is a contract which is capable of performing cluster info lookups.
  64. type ClusterInfoProvider interface {
  65. // GetClusterInfo returns a string map containing the local/remote connected cluster info
  66. GetClusterInfo() map[string]string
  67. }
  68. // ClusterMap keeps records of all known cost-model clusters.
  69. type PrometheusClusterMap struct {
  70. lock *sync.RWMutex
  71. client prometheus.Client
  72. clusters map[string]*ClusterInfo
  73. clusterInfo ClusterInfoProvider
  74. stop chan struct{}
  75. }
  76. // NewClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
  77. func NewClusterMap(client prometheus.Client, cip ClusterInfoProvider, refresh time.Duration) ClusterMap {
  78. stop := make(chan struct{})
  79. cm := &PrometheusClusterMap{
  80. lock: new(sync.RWMutex),
  81. client: client,
  82. clusters: make(map[string]*ClusterInfo),
  83. clusterInfo: cip,
  84. stop: stop,
  85. }
  86. // Run an updater to ensure cluster data stays relevant over time
  87. go func() {
  88. // Immediately Attempt to refresh the clusters
  89. cm.refreshClusters()
  90. // Tick on interval and refresh clusters
  91. ticker := time.NewTicker(refresh)
  92. for {
  93. select {
  94. case <-ticker.C:
  95. cm.refreshClusters()
  96. case <-cm.stop:
  97. log.Infof("ClusterMap refresh stopped.")
  98. return
  99. }
  100. }
  101. }()
  102. return cm
  103. }
  104. // clusterInfoQuery returns the query string to load cluster info
  105. func clusterInfoQuery(offset string) string {
  106. return fmt.Sprintf("kubecost_cluster_info%s", offset)
  107. }
  108. // loadClusters loads all the cluster info to map
  109. func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error) {
  110. var offset string = ""
  111. if prom.IsThanos(pcm.client) {
  112. offset = thanos.QueryOffset()
  113. }
  114. // Execute Query
  115. tryQuery := func() (interface{}, error) {
  116. ctx := prom.NewNamedContext(pcm.client, prom.ClusterMapContextName)
  117. r, _, e := ctx.QuerySync(clusterInfoQuery(offset))
  118. return r, e
  119. }
  120. // Retry on failure
  121. result, err := retry.Retry(context.Background(), tryQuery, uint(LoadRetries), LoadRetryDelay)
  122. qr, ok := result.([]*prom.QueryResult)
  123. if !ok || err != nil {
  124. return nil, err
  125. }
  126. clusters := make(map[string]*ClusterInfo)
  127. // Load the query results. Critical fields are id and name.
  128. for _, result := range qr {
  129. id, err := result.GetString("id")
  130. if err != nil {
  131. log.Warningf("Failed to load 'id' field for ClusterInfo")
  132. continue
  133. }
  134. name, err := result.GetString("name")
  135. if err != nil {
  136. log.Warningf("Failed to load 'name' field for ClusterInfo")
  137. continue
  138. }
  139. profile, err := result.GetString("clusterprofile")
  140. if err != nil {
  141. profile = ""
  142. }
  143. provider, err := result.GetString("provider")
  144. if err != nil {
  145. provider = ""
  146. }
  147. account, err := result.GetString("account")
  148. if err != nil {
  149. account = ""
  150. }
  151. project, err := result.GetString("project")
  152. if err != nil {
  153. project = ""
  154. }
  155. region, err := result.GetString("region")
  156. if err != nil {
  157. region = ""
  158. }
  159. provisioner, err := result.GetString("provisioner")
  160. if err != nil {
  161. provisioner = ""
  162. }
  163. clusters[id] = &ClusterInfo{
  164. ID: id,
  165. Name: name,
  166. Profile: profile,
  167. Provider: provider,
  168. Account: account,
  169. Project: project,
  170. Region: region,
  171. Provisioner: provisioner,
  172. }
  173. }
  174. // populate the local cluster if it doesn't exist
  175. localInfo, err := pcm.getLocalClusterInfo()
  176. if err != nil {
  177. return clusters, nil
  178. }
  179. // Check to see if the local cluster's id is part of our loaded clusters, and include if not
  180. if _, ok := clusters[localInfo.ID]; !ok {
  181. clusters[localInfo.ID] = localInfo
  182. }
  183. return clusters, nil
  184. }
  185. // getLocalClusterInfo returns the local cluster info in the event there does not exist a metric available.
  186. func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*ClusterInfo, error) {
  187. info := pcm.clusterInfo.GetClusterInfo()
  188. var id string
  189. var name string
  190. if i, ok := info["id"]; ok {
  191. id = i
  192. } else {
  193. return nil, fmt.Errorf("Local Cluster Info Missing ID")
  194. }
  195. if n, ok := info["name"]; ok {
  196. name = n
  197. } else {
  198. return nil, fmt.Errorf("Local Cluster Info Missing Name")
  199. }
  200. var clusterProfile string
  201. var provider string
  202. var account string
  203. var project string
  204. var region string
  205. var provisioner string
  206. if cp, ok := info["clusterProfile"]; ok {
  207. clusterProfile = cp
  208. }
  209. if pvdr, ok := info["provider"]; ok {
  210. provider = pvdr
  211. }
  212. if acct, ok := info["account"]; ok {
  213. account = acct
  214. }
  215. if proj, ok := info["project"]; ok {
  216. project = proj
  217. }
  218. if reg, ok := info["region"]; ok {
  219. region = reg
  220. }
  221. if pvsr, ok := info["provisioner"]; ok {
  222. provisioner = pvsr
  223. }
  224. return &ClusterInfo{
  225. ID: id,
  226. Name: name,
  227. Profile: clusterProfile,
  228. Provider: provider,
  229. Account: account,
  230. Project: project,
  231. Region: region,
  232. Provisioner: provisioner,
  233. }, nil
  234. }
  235. // refreshClusters loads the clusters and updates the internal map
  236. func (pcm *PrometheusClusterMap) refreshClusters() {
  237. updated, err := pcm.loadClusters()
  238. if err != nil {
  239. log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
  240. return
  241. }
  242. pcm.lock.Lock()
  243. pcm.clusters = updated
  244. pcm.lock.Unlock()
  245. }
  246. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  247. func (pcm *PrometheusClusterMap) GetClusterIDs() []string {
  248. pcm.lock.RLock()
  249. defer pcm.lock.RUnlock()
  250. var clusterIDs []string
  251. for id := range pcm.clusters {
  252. clusterIDs = append(clusterIDs, id)
  253. }
  254. return clusterIDs
  255. }
  256. // AsMap returns the cluster map as a standard go map
  257. func (pcm *PrometheusClusterMap) AsMap() map[string]*ClusterInfo {
  258. pcm.lock.RLock()
  259. defer pcm.lock.RUnlock()
  260. m := make(map[string]*ClusterInfo)
  261. for k, v := range pcm.clusters {
  262. m[k] = v.Clone()
  263. }
  264. return m
  265. }
  266. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  267. // doesn't exist
  268. func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *ClusterInfo {
  269. pcm.lock.RLock()
  270. defer pcm.lock.RUnlock()
  271. if info, ok := pcm.clusters[clusterID]; ok {
  272. return info.Clone()
  273. }
  274. return nil
  275. }
  276. // NameFor returns the name of the cluster provided the clusterID.
  277. func (pcm *PrometheusClusterMap) NameFor(clusterID string) string {
  278. pcm.lock.RLock()
  279. defer pcm.lock.RUnlock()
  280. if info, ok := pcm.clusters[clusterID]; ok {
  281. return info.Name
  282. }
  283. return ""
  284. }
  285. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  286. // assigned name. Otherwise, just the clusterID is returned.
  287. func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
  288. pcm.lock.RLock()
  289. defer pcm.lock.RUnlock()
  290. if info, ok := pcm.clusters[clusterID]; ok {
  291. if info.Name == "" {
  292. return clusterID
  293. }
  294. return fmt.Sprintf("%s/%s", info.Name, clusterID)
  295. }
  296. return clusterID
  297. }
  298. func (pcm *PrometheusClusterMap) SplitNameID(nameID string) (id string, name string) {
  299. if !strings.Contains(nameID, "/") {
  300. id = nameID
  301. name = ""
  302. return
  303. }
  304. split := strings.Split(nameID, "/")
  305. name = split[0]
  306. id = split[1]
  307. return
  308. }
  309. // StopRefresh stops the automatic internal map refresh
  310. func (pcm *PrometheusClusterMap) StopRefresh() {
  311. if pcm.stop != nil {
  312. close(pcm.stop)
  313. pcm.stop = nil
  314. }
  315. }