clustermap.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. package clusters
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/env"
  9. "github.com/kubecost/cost-model/pkg/log"
  10. "github.com/kubecost/cost-model/pkg/prom"
  11. "github.com/kubecost/cost-model/pkg/thanos"
  12. "github.com/kubecost/cost-model/pkg/util/retry"
  13. prometheus "github.com/prometheus/client_golang/api"
  14. )
  15. const (
  16. LoadRetries int = 6
  17. LoadRetryDelay time.Duration = 10 * time.Second
  18. )
  19. // ClusterInfo holds attributes of Cluster from metrics pulled from Prometheus
  20. type ClusterInfo struct {
  21. ID string `json:"id"`
  22. Name string `json:"name"`
  23. Profile string `json:"profile"`
  24. Provider string `json:"provider"`
  25. Account string `json:"account"`
  26. Project string `json:"project"`
  27. Region string `json:"region"`
  28. Provisioner string `json:"provisioner"`
  29. }
  30. // Clone creates a copy of ClusterInfo and returns it
  31. func (ci *ClusterInfo) Clone() *ClusterInfo {
  32. if ci == nil {
  33. return nil
  34. }
  35. return &ClusterInfo{
  36. ID: ci.ID,
  37. Name: ci.Name,
  38. Profile: ci.Profile,
  39. Provider: ci.Provider,
  40. Account: ci.Account,
  41. Project: ci.Project,
  42. Region: ci.Region,
  43. Provisioner: ci.Provisioner,
  44. }
  45. }
  46. type ClusterMap interface {
  47. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  48. GetClusterIDs() []string
  49. // AsMap returns the cluster map as a standard go map
  50. AsMap() map[string]*ClusterInfo
  51. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  52. // doesn't exist
  53. InfoFor(clusterID string) *ClusterInfo
  54. // NameFor returns the name of the cluster provided the clusterID.
  55. NameFor(clusterID string) string
  56. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  57. // assigned name. Otherwise, just the clusterID is returned.
  58. NameIDFor(clusterID string) string
  59. // SplitNameID splits the nameID back into a separate id and name field
  60. SplitNameID(nameID string) (id string, name string)
  61. // StopRefresh stops the automatic internal map refresh
  62. StopRefresh()
  63. }
  64. // LocalClusterInfoProvider is a contract which is capable of performing local cluster info lookups.
  65. type LocalClusterInfoProvider interface {
  66. // GetClusterInfo returns a string map containing the local cluster info
  67. GetClusterInfo() map[string]string
  68. }
  69. // ClusterMap keeps records of all known cost-model clusters.
  70. type PrometheusClusterMap struct {
  71. lock *sync.RWMutex
  72. client prometheus.Client
  73. clusters map[string]*ClusterInfo
  74. localCluster LocalClusterInfoProvider
  75. stop chan struct{}
  76. }
  77. // NewClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
  78. func NewClusterMap(client prometheus.Client, lcip LocalClusterInfoProvider, refresh time.Duration) ClusterMap {
  79. stop := make(chan struct{})
  80. cm := &PrometheusClusterMap{
  81. lock: new(sync.RWMutex),
  82. client: client,
  83. clusters: make(map[string]*ClusterInfo),
  84. localCluster: lcip,
  85. stop: stop,
  86. }
  87. // Run an updater to ensure cluster data stays relevant over time
  88. go func() {
  89. // Immediately Attempt to refresh the clusters
  90. cm.refreshClusters()
  91. // Tick on interval and refresh clusters
  92. ticker := time.NewTicker(refresh)
  93. for {
  94. select {
  95. case <-ticker.C:
  96. cm.refreshClusters()
  97. case <-cm.stop:
  98. log.Infof("ClusterMap refresh stopped.")
  99. return
  100. }
  101. }
  102. }()
  103. return cm
  104. }
  105. // clusterInfoQuery returns the query string to load cluster info
  106. func clusterInfoQuery(offset string) string {
  107. return fmt.Sprintf("kubecost_cluster_info%s", offset)
  108. }
  109. // loadClusters loads all the cluster info to map
  110. func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error) {
  111. var offset string = ""
  112. if prom.IsThanos(pcm.client) {
  113. offset = thanos.QueryOffset()
  114. }
  115. // Execute Query
  116. tryQuery := func() (interface{}, error) {
  117. ctx := prom.NewNamedContext(pcm.client, prom.ClusterMapContextName)
  118. r, _, e := ctx.QuerySync(clusterInfoQuery(offset))
  119. return r, e
  120. }
  121. // Retry on failure
  122. result, err := retry.Retry(context.Background(), tryQuery, uint(LoadRetries), LoadRetryDelay)
  123. qr, ok := result.([]*prom.QueryResult)
  124. if !ok || err != nil {
  125. return nil, err
  126. }
  127. clusters := make(map[string]*ClusterInfo)
  128. // Load the query results. Critical fields are id and name.
  129. for _, result := range qr {
  130. id, err := result.GetString("id")
  131. if err != nil {
  132. log.Warningf("Failed to load 'id' field for ClusterInfo")
  133. continue
  134. }
  135. name, err := result.GetString("name")
  136. if err != nil {
  137. log.Warningf("Failed to load 'name' field for ClusterInfo")
  138. continue
  139. }
  140. profile, err := result.GetString("clusterprofile")
  141. if err != nil {
  142. profile = ""
  143. }
  144. provider, err := result.GetString("provider")
  145. if err != nil {
  146. provider = ""
  147. }
  148. account, err := result.GetString("account")
  149. if err != nil {
  150. account = ""
  151. }
  152. project, err := result.GetString("project")
  153. if err != nil {
  154. project = ""
  155. }
  156. region, err := result.GetString("region")
  157. if err != nil {
  158. region = ""
  159. }
  160. provisioner, err := result.GetString("provisioner")
  161. if err != nil {
  162. provisioner = ""
  163. }
  164. clusters[id] = &ClusterInfo{
  165. ID: id,
  166. Name: name,
  167. Profile: profile,
  168. Provider: provider,
  169. Account: account,
  170. Project: project,
  171. Region: region,
  172. Provisioner: provisioner,
  173. }
  174. }
  175. // populate the local cluster if it doesn't exist
  176. localID := env.GetClusterID()
  177. if _, ok := clusters[localID]; !ok {
  178. localInfo, err := pcm.getLocalClusterInfo()
  179. if err != nil {
  180. log.Warningf("Failed to load local cluster info: %s", err)
  181. } else {
  182. clusters[localInfo.ID] = localInfo
  183. }
  184. }
  185. return clusters, nil
  186. }
  187. // getLocalClusterInfo returns the local cluster info in the event there does not exist a metric available.
  188. func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*ClusterInfo, error) {
  189. info := pcm.localCluster.GetClusterInfo()
  190. var id string
  191. var name string
  192. if i, ok := info["id"]; ok {
  193. id = i
  194. } else {
  195. return nil, fmt.Errorf("Local Cluster Info Missing ID")
  196. }
  197. if n, ok := info["name"]; ok {
  198. name = n
  199. } else {
  200. return nil, fmt.Errorf("Local Cluster Info Missing Name")
  201. }
  202. var clusterProfile string
  203. var provider string
  204. var account string
  205. var project string
  206. var region string
  207. var provisioner string
  208. if cp, ok := info["clusterProfile"]; ok {
  209. clusterProfile = cp
  210. }
  211. if pvdr, ok := info["provider"]; ok {
  212. provider = pvdr
  213. }
  214. if acct, ok := info["account"]; ok {
  215. account = acct
  216. }
  217. if proj, ok := info["project"]; ok {
  218. project = proj
  219. }
  220. if reg, ok := info["region"]; ok {
  221. region = reg
  222. }
  223. if pvsr, ok := info["provisioner"]; ok {
  224. provisioner = pvsr
  225. }
  226. return &ClusterInfo{
  227. ID: id,
  228. Name: name,
  229. Profile: clusterProfile,
  230. Provider: provider,
  231. Account: account,
  232. Project: project,
  233. Region: region,
  234. Provisioner: provisioner,
  235. }, nil
  236. }
  237. // refreshClusters loads the clusters and updates the internal map
  238. func (pcm *PrometheusClusterMap) refreshClusters() {
  239. updated, err := pcm.loadClusters()
  240. if err != nil {
  241. log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
  242. return
  243. }
  244. pcm.lock.Lock()
  245. pcm.clusters = updated
  246. pcm.lock.Unlock()
  247. }
  248. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  249. func (pcm *PrometheusClusterMap) GetClusterIDs() []string {
  250. pcm.lock.RLock()
  251. defer pcm.lock.RUnlock()
  252. var clusterIDs []string
  253. for id := range pcm.clusters {
  254. clusterIDs = append(clusterIDs, id)
  255. }
  256. return clusterIDs
  257. }
  258. // AsMap returns the cluster map as a standard go map
  259. func (pcm *PrometheusClusterMap) AsMap() map[string]*ClusterInfo {
  260. pcm.lock.RLock()
  261. defer pcm.lock.RUnlock()
  262. m := make(map[string]*ClusterInfo)
  263. for k, v := range pcm.clusters {
  264. m[k] = v.Clone()
  265. }
  266. return m
  267. }
  268. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  269. // doesn't exist
  270. func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *ClusterInfo {
  271. pcm.lock.RLock()
  272. defer pcm.lock.RUnlock()
  273. if info, ok := pcm.clusters[clusterID]; ok {
  274. return info.Clone()
  275. }
  276. return nil
  277. }
  278. // NameFor returns the name of the cluster provided the clusterID.
  279. func (pcm *PrometheusClusterMap) NameFor(clusterID string) string {
  280. pcm.lock.RLock()
  281. defer pcm.lock.RUnlock()
  282. if info, ok := pcm.clusters[clusterID]; ok {
  283. return info.Name
  284. }
  285. return ""
  286. }
  287. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  288. // assigned name. Otherwise, just the clusterID is returned.
  289. func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
  290. pcm.lock.RLock()
  291. defer pcm.lock.RUnlock()
  292. if info, ok := pcm.clusters[clusterID]; ok {
  293. if info.Name == "" {
  294. return clusterID
  295. }
  296. return fmt.Sprintf("%s/%s", info.Name, clusterID)
  297. }
  298. return clusterID
  299. }
  300. func (pcm *PrometheusClusterMap) SplitNameID(nameID string) (id string, name string) {
  301. if !strings.Contains(nameID, "/") {
  302. id = nameID
  303. name = ""
  304. return
  305. }
  306. split := strings.Split(nameID, "/")
  307. name = split[0]
  308. id = split[1]
  309. return
  310. }
  311. // StopRefresh stops the automatic internal map refresh
  312. func (pcm *PrometheusClusterMap) StopRefresh() {
  313. if pcm.stop != nil {
  314. close(pcm.stop)
  315. pcm.stop = nil
  316. }
  317. }