clustermap.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package prom
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/opencost/opencost/modules/prometheus-source/pkg/env"
  9. "github.com/opencost/opencost/modules/prometheus-source/pkg/thanos"
  10. "github.com/opencost/opencost/core/pkg/clusters"
  11. "github.com/opencost/opencost/core/pkg/log"
  12. "github.com/opencost/opencost/core/pkg/source"
  13. "github.com/opencost/opencost/core/pkg/util/retry"
  14. )
  15. const (
  16. LoadRetries int = 6
  17. LoadRetryDelay time.Duration = 10 * time.Second
  18. )
  19. // ClusterMap keeps records of all known cost-model clusters.
  20. type PrometheusClusterMap struct {
  21. lock sync.RWMutex
  22. contextFactory *ContextFactory
  23. clusters map[string]*clusters.ClusterInfo
  24. clusterInfo clusters.ClusterInfoProvider
  25. stop chan struct{}
  26. }
  27. // newPrometheusClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
  28. func newPrometheusClusterMap(contextFactory *ContextFactory, cip clusters.ClusterInfoProvider, refresh time.Duration) clusters.ClusterMap {
  29. stop := make(chan struct{})
  30. cm := &PrometheusClusterMap{
  31. contextFactory: contextFactory,
  32. clusters: make(map[string]*clusters.ClusterInfo),
  33. clusterInfo: cip,
  34. stop: stop,
  35. }
  36. // Run an updater to ensure cluster data stays relevant over time
  37. go func() {
  38. // Immediately Attempt to refresh the clusters
  39. cm.refreshClusters()
  40. // Tick on interval and refresh clusters
  41. ticker := time.NewTicker(refresh)
  42. for {
  43. select {
  44. case <-ticker.C:
  45. cm.refreshClusters()
  46. case <-cm.stop:
  47. log.Infof("ClusterMap refresh stopped.")
  48. return
  49. }
  50. }
  51. }()
  52. return cm
  53. }
  54. // clusterInfoQuery returns the query string to load cluster info
  55. func clusterInfoQuery(offset string) string {
  56. return fmt.Sprintf("kubecost_cluster_info{%s}%s", env.GetPromClusterFilter(), offset)
  57. }
  58. // loadClusters loads all the cluster info to map
  59. func (pcm *PrometheusClusterMap) loadClusters() (map[string]*clusters.ClusterInfo, error) {
  60. var offset string = ""
  61. if IsThanos(pcm.contextFactory.client) {
  62. offset = thanos.QueryOffset()
  63. }
  64. // Execute Query
  65. tryQuery := func() (interface{}, error) {
  66. ctx := pcm.contextFactory.NewNamedContext(ClusterMapContextName)
  67. resCh := ctx.QueryAtTime(clusterInfoQuery(offset), time.Now())
  68. r, e := resCh.Await()
  69. return r, e
  70. }
  71. // Retry on failure
  72. result, err := retry.Retry(context.Background(), tryQuery, uint(LoadRetries), LoadRetryDelay)
  73. qr, ok := result.([]*source.QueryResult)
  74. if !ok || err != nil {
  75. return nil, err
  76. }
  77. allClusters := make(map[string]*clusters.ClusterInfo)
  78. // Load the query results. Critical fields are id and name.
  79. for _, result := range qr {
  80. id, err := result.GetString("id")
  81. if err != nil {
  82. log.Warnf("Failed to load 'id' field for ClusterInfo")
  83. continue
  84. }
  85. name, err := result.GetString("name")
  86. if err != nil {
  87. log.Warnf("Failed to load 'name' field for ClusterInfo")
  88. continue
  89. }
  90. profile, err := result.GetString("clusterprofile")
  91. if err != nil {
  92. profile = ""
  93. }
  94. provider, err := result.GetString("provider")
  95. if err != nil {
  96. provider = ""
  97. }
  98. account, err := result.GetString("account")
  99. if err != nil {
  100. account = ""
  101. }
  102. project, err := result.GetString("project")
  103. if err != nil {
  104. project = ""
  105. }
  106. region, err := result.GetString("region")
  107. if err != nil {
  108. region = ""
  109. }
  110. provisioner, err := result.GetString("provisioner")
  111. if err != nil {
  112. provisioner = ""
  113. }
  114. allClusters[id] = &clusters.ClusterInfo{
  115. ID: id,
  116. Name: name,
  117. Profile: profile,
  118. Provider: provider,
  119. Account: account,
  120. Project: project,
  121. Region: region,
  122. Provisioner: provisioner,
  123. }
  124. }
  125. // populate the local cluster if it doesn't exist
  126. localInfo, err := pcm.getLocalClusterInfo()
  127. if err != nil {
  128. return allClusters, nil
  129. }
  130. // Check to see if the local cluster's id is part of our loaded clusters, and include if not
  131. if _, ok := allClusters[localInfo.ID]; !ok {
  132. allClusters[localInfo.ID] = localInfo
  133. }
  134. return allClusters, nil
  135. }
  136. // getLocalClusterInfo returns the local cluster info in the event there does not exist a metric available.
  137. func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*clusters.ClusterInfo, error) {
  138. info := pcm.clusterInfo.GetClusterInfo()
  139. clusterInfo, err := clusters.MapToClusterInfo(info)
  140. if err != nil {
  141. return nil, fmt.Errorf("parsing local cluster info failed: %w", err)
  142. }
  143. return clusterInfo, nil
  144. }
  145. // refreshClusters loads the clusters and updates the internal map
  146. func (pcm *PrometheusClusterMap) refreshClusters() {
  147. updated, err := pcm.loadClusters()
  148. if err != nil {
  149. log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
  150. return
  151. }
  152. pcm.lock.Lock()
  153. pcm.clusters = updated
  154. pcm.lock.Unlock()
  155. }
  156. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  157. func (pcm *PrometheusClusterMap) GetClusterIDs() []string {
  158. pcm.lock.RLock()
  159. defer pcm.lock.RUnlock()
  160. var clusterIDs []string
  161. for id := range pcm.clusters {
  162. clusterIDs = append(clusterIDs, id)
  163. }
  164. return clusterIDs
  165. }
  166. // AsMap returns the cluster map as a standard go map
  167. func (pcm *PrometheusClusterMap) AsMap() map[string]*clusters.ClusterInfo {
  168. pcm.lock.RLock()
  169. defer pcm.lock.RUnlock()
  170. m := make(map[string]*clusters.ClusterInfo)
  171. for k, v := range pcm.clusters {
  172. m[k] = v.Clone()
  173. }
  174. return m
  175. }
  176. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  177. // doesn't exist
  178. func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *clusters.ClusterInfo {
  179. pcm.lock.RLock()
  180. defer pcm.lock.RUnlock()
  181. if info, ok := pcm.clusters[clusterID]; ok {
  182. return info.Clone()
  183. }
  184. return nil
  185. }
  186. // NameFor returns the name of the cluster provided the clusterID.
  187. func (pcm *PrometheusClusterMap) NameFor(clusterID string) string {
  188. pcm.lock.RLock()
  189. defer pcm.lock.RUnlock()
  190. if info, ok := pcm.clusters[clusterID]; ok {
  191. return info.Name
  192. }
  193. return ""
  194. }
  195. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  196. // assigned name. Otherwise, just the clusterID is returned.
  197. func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
  198. pcm.lock.RLock()
  199. defer pcm.lock.RUnlock()
  200. if info, ok := pcm.clusters[clusterID]; ok {
  201. if info.Name == "" {
  202. return clusterID
  203. }
  204. return fmt.Sprintf("%s/%s", info.Name, clusterID)
  205. }
  206. return clusterID
  207. }
  208. // SplitNameID is a helper method that removes the common split format and returns
  209. func SplitNameID(nameID string) (id string, name string) {
  210. if !strings.Contains(nameID, "/") {
  211. id = nameID
  212. name = ""
  213. return
  214. }
  215. split := strings.Split(nameID, "/")
  216. name = split[0]
  217. id = split[1]
  218. return
  219. }
  220. // StopRefresh stops the automatic internal map refresh
  221. func (pcm *PrometheusClusterMap) StopRefresh() {
  222. if pcm.stop != nil {
  223. close(pcm.stop)
  224. pcm.stop = nil
  225. }
  226. }