clustermap.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. package clusters
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/opencost/opencost/pkg/env"
  9. "github.com/opencost/opencost/pkg/log"
  10. "github.com/opencost/opencost/pkg/prom"
  11. "github.com/opencost/opencost/pkg/thanos"
  12. "github.com/opencost/opencost/pkg/util/retry"
  13. prometheus "github.com/prometheus/client_golang/api"
  14. )
  15. const (
  16. LoadRetries int = 6
  17. LoadRetryDelay time.Duration = 10 * time.Second
  18. )
  19. // prometheus query offset to apply to each non-range query
  20. // package scope to prevent calling duration parse each use
  21. var promQueryOffset = env.GetPrometheusQueryOffset()
  22. // ClusterInfo holds attributes of Cluster from metrics pulled from Prometheus
  23. type ClusterInfo struct {
  24. ID string `json:"id"`
  25. Name string `json:"name"`
  26. Profile string `json:"profile"`
  27. Provider string `json:"provider"`
  28. Account string `json:"account"`
  29. Project string `json:"project"`
  30. Region string `json:"region"`
  31. Provisioner string `json:"provisioner"`
  32. }
  33. // Clone creates a copy of ClusterInfo and returns it
  34. func (ci *ClusterInfo) Clone() *ClusterInfo {
  35. if ci == nil {
  36. return nil
  37. }
  38. return &ClusterInfo{
  39. ID: ci.ID,
  40. Name: ci.Name,
  41. Profile: ci.Profile,
  42. Provider: ci.Provider,
  43. Account: ci.Account,
  44. Project: ci.Project,
  45. Region: ci.Region,
  46. Provisioner: ci.Provisioner,
  47. }
  48. }
  49. type ClusterMap interface {
  50. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  51. GetClusterIDs() []string
  52. // AsMap returns the cluster map as a standard go map
  53. AsMap() map[string]*ClusterInfo
  54. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  55. // doesn't exist
  56. InfoFor(clusterID string) *ClusterInfo
  57. // NameFor returns the name of the cluster provided the clusterID.
  58. NameFor(clusterID string) string
  59. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  60. // assigned name. Otherwise, just the clusterID is returned.
  61. NameIDFor(clusterID string) string
  62. // SplitNameID splits the nameID back into a separate id and name field
  63. SplitNameID(nameID string) (id string, name string)
  64. // StopRefresh stops the automatic internal map refresh
  65. StopRefresh()
  66. }
  67. // ClusterInfoProvider is a contract which is capable of performing cluster info lookups.
  68. type ClusterInfoProvider interface {
  69. // GetClusterInfo returns a string map containing the local/remote connected cluster info
  70. GetClusterInfo() map[string]string
  71. }
  72. // ClusterMap keeps records of all known cost-model clusters.
  73. type PrometheusClusterMap struct {
  74. lock *sync.RWMutex
  75. client prometheus.Client
  76. clusters map[string]*ClusterInfo
  77. clusterInfo ClusterInfoProvider
  78. stop chan struct{}
  79. }
  80. // NewClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
  81. func NewClusterMap(client prometheus.Client, cip ClusterInfoProvider, refresh time.Duration) ClusterMap {
  82. stop := make(chan struct{})
  83. cm := &PrometheusClusterMap{
  84. lock: new(sync.RWMutex),
  85. client: client,
  86. clusters: make(map[string]*ClusterInfo),
  87. clusterInfo: cip,
  88. stop: stop,
  89. }
  90. // Run an updater to ensure cluster data stays relevant over time
  91. go func() {
  92. // Immediately Attempt to refresh the clusters
  93. cm.refreshClusters()
  94. // Tick on interval and refresh clusters
  95. ticker := time.NewTicker(refresh)
  96. for {
  97. select {
  98. case <-ticker.C:
  99. cm.refreshClusters()
  100. case <-cm.stop:
  101. log.Infof("ClusterMap refresh stopped.")
  102. return
  103. }
  104. }
  105. }()
  106. return cm
  107. }
  108. // clusterInfoQuery returns the query string to load cluster info
  109. func clusterInfoQuery(offset string) string {
  110. return fmt.Sprintf("kubecost_cluster_info%s", offset)
  111. }
  112. // loadClusters loads all the cluster info to map
  113. func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error) {
  114. var offset string = ""
  115. if prom.IsThanos(pcm.client) {
  116. offset = thanos.QueryOffset()
  117. }
  118. // Execute Query
  119. tryQuery := func() (interface{}, error) {
  120. ctx := prom.NewNamedContext(pcm.client, prom.ClusterMapContextName)
  121. resCh := ctx.QueryAtTime(clusterInfoQuery(offset), time.Now().Add(-promQueryOffset))
  122. r, e := resCh.Await()
  123. return r, e
  124. }
  125. // Retry on failure
  126. result, err := retry.Retry(context.Background(), tryQuery, uint(LoadRetries), LoadRetryDelay)
  127. qr, ok := result.([]*prom.QueryResult)
  128. if !ok || err != nil {
  129. return nil, err
  130. }
  131. clusters := make(map[string]*ClusterInfo)
  132. // Load the query results. Critical fields are id and name.
  133. for _, result := range qr {
  134. id, err := result.GetString("id")
  135. if err != nil {
  136. log.Warnf("Failed to load 'id' field for ClusterInfo")
  137. continue
  138. }
  139. name, err := result.GetString("name")
  140. if err != nil {
  141. log.Warnf("Failed to load 'name' field for ClusterInfo")
  142. continue
  143. }
  144. profile, err := result.GetString("clusterprofile")
  145. if err != nil {
  146. profile = ""
  147. }
  148. provider, err := result.GetString("provider")
  149. if err != nil {
  150. provider = ""
  151. }
  152. account, err := result.GetString("account")
  153. if err != nil {
  154. account = ""
  155. }
  156. project, err := result.GetString("project")
  157. if err != nil {
  158. project = ""
  159. }
  160. region, err := result.GetString("region")
  161. if err != nil {
  162. region = ""
  163. }
  164. provisioner, err := result.GetString("provisioner")
  165. if err != nil {
  166. provisioner = ""
  167. }
  168. clusters[id] = &ClusterInfo{
  169. ID: id,
  170. Name: name,
  171. Profile: profile,
  172. Provider: provider,
  173. Account: account,
  174. Project: project,
  175. Region: region,
  176. Provisioner: provisioner,
  177. }
  178. }
  179. // populate the local cluster if it doesn't exist
  180. localInfo, err := pcm.getLocalClusterInfo()
  181. if err != nil {
  182. return clusters, nil
  183. }
  184. // Check to see if the local cluster's id is part of our loaded clusters, and include if not
  185. if _, ok := clusters[localInfo.ID]; !ok {
  186. clusters[localInfo.ID] = localInfo
  187. }
  188. return clusters, nil
  189. }
  190. // getLocalClusterInfo returns the local cluster info in the event there does not exist a metric available.
  191. func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*ClusterInfo, error) {
  192. info := pcm.clusterInfo.GetClusterInfo()
  193. var id string
  194. var name string
  195. if i, ok := info["id"]; ok {
  196. id = i
  197. } else {
  198. return nil, fmt.Errorf("Local Cluster Info Missing ID")
  199. }
  200. if n, ok := info["name"]; ok {
  201. name = n
  202. } else {
  203. return nil, fmt.Errorf("Local Cluster Info Missing Name")
  204. }
  205. var clusterProfile string
  206. var provider string
  207. var account string
  208. var project string
  209. var region string
  210. var provisioner string
  211. if cp, ok := info["clusterProfile"]; ok {
  212. clusterProfile = cp
  213. }
  214. if pvdr, ok := info["provider"]; ok {
  215. provider = pvdr
  216. }
  217. if acct, ok := info["account"]; ok {
  218. account = acct
  219. }
  220. if proj, ok := info["project"]; ok {
  221. project = proj
  222. }
  223. if reg, ok := info["region"]; ok {
  224. region = reg
  225. }
  226. if pvsr, ok := info["provisioner"]; ok {
  227. provisioner = pvsr
  228. }
  229. return &ClusterInfo{
  230. ID: id,
  231. Name: name,
  232. Profile: clusterProfile,
  233. Provider: provider,
  234. Account: account,
  235. Project: project,
  236. Region: region,
  237. Provisioner: provisioner,
  238. }, nil
  239. }
  240. // refreshClusters loads the clusters and updates the internal map
  241. func (pcm *PrometheusClusterMap) refreshClusters() {
  242. updated, err := pcm.loadClusters()
  243. if err != nil {
  244. log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
  245. return
  246. }
  247. pcm.lock.Lock()
  248. pcm.clusters = updated
  249. pcm.lock.Unlock()
  250. }
  251. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  252. func (pcm *PrometheusClusterMap) GetClusterIDs() []string {
  253. pcm.lock.RLock()
  254. defer pcm.lock.RUnlock()
  255. var clusterIDs []string
  256. for id := range pcm.clusters {
  257. clusterIDs = append(clusterIDs, id)
  258. }
  259. return clusterIDs
  260. }
  261. // AsMap returns the cluster map as a standard go map
  262. func (pcm *PrometheusClusterMap) AsMap() map[string]*ClusterInfo {
  263. pcm.lock.RLock()
  264. defer pcm.lock.RUnlock()
  265. m := make(map[string]*ClusterInfo)
  266. for k, v := range pcm.clusters {
  267. m[k] = v.Clone()
  268. }
  269. return m
  270. }
  271. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  272. // doesn't exist
  273. func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *ClusterInfo {
  274. pcm.lock.RLock()
  275. defer pcm.lock.RUnlock()
  276. if info, ok := pcm.clusters[clusterID]; ok {
  277. return info.Clone()
  278. }
  279. return nil
  280. }
  281. // NameFor returns the name of the cluster provided the clusterID.
  282. func (pcm *PrometheusClusterMap) NameFor(clusterID string) string {
  283. pcm.lock.RLock()
  284. defer pcm.lock.RUnlock()
  285. if info, ok := pcm.clusters[clusterID]; ok {
  286. return info.Name
  287. }
  288. return ""
  289. }
  290. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  291. // assigned name. Otherwise, just the clusterID is returned.
  292. func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
  293. pcm.lock.RLock()
  294. defer pcm.lock.RUnlock()
  295. if info, ok := pcm.clusters[clusterID]; ok {
  296. if info.Name == "" {
  297. return clusterID
  298. }
  299. return fmt.Sprintf("%s/%s", info.Name, clusterID)
  300. }
  301. return clusterID
  302. }
  303. func (pcm *PrometheusClusterMap) SplitNameID(nameID string) (id string, name string) {
  304. if !strings.Contains(nameID, "/") {
  305. id = nameID
  306. name = ""
  307. return
  308. }
  309. split := strings.Split(nameID, "/")
  310. name = split[0]
  311. id = split[1]
  312. return
  313. }
  314. // StopRefresh stops the automatic internal map refresh
  315. func (pcm *PrometheusClusterMap) StopRefresh() {
  316. if pcm.stop != nil {
  317. close(pcm.stop)
  318. pcm.stop = nil
  319. }
  320. }