clustermap.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. package clusters
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/opencost/opencost/pkg/env"
  9. "github.com/opencost/opencost/core/pkg/clusters"
  10. "github.com/opencost/opencost/core/pkg/log"
  11. "github.com/opencost/opencost/core/pkg/util/retry"
  12. "github.com/opencost/opencost/pkg/prom"
  13. "github.com/opencost/opencost/pkg/thanos"
  14. prometheus "github.com/prometheus/client_golang/api"
  15. )
  16. const (
  17. LoadRetries int = 6
  18. LoadRetryDelay time.Duration = 10 * time.Second
  19. )
  20. // prometheus query offset to apply to each non-range query
  21. // package scope to prevent calling duration parse each use
  22. var promQueryOffset = env.GetPrometheusQueryOffset()
  23. // ClusterMap keeps records of all known cost-model clusters.
  24. type PrometheusClusterMap struct {
  25. lock sync.RWMutex
  26. client prometheus.Client
  27. clusters map[string]*clusters.ClusterInfo
  28. clusterInfo clusters.ClusterInfoProvider
  29. stop chan struct{}
  30. }
  31. // NewClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
  32. func NewClusterMap(client prometheus.Client, cip clusters.ClusterInfoProvider, refresh time.Duration) clusters.ClusterMap {
  33. stop := make(chan struct{})
  34. cm := &PrometheusClusterMap{
  35. client: client,
  36. clusters: make(map[string]*clusters.ClusterInfo),
  37. clusterInfo: cip,
  38. stop: stop,
  39. }
  40. // Run an updater to ensure cluster data stays relevant over time
  41. go func() {
  42. // Immediately Attempt to refresh the clusters
  43. cm.refreshClusters()
  44. // Tick on interval and refresh clusters
  45. ticker := time.NewTicker(refresh)
  46. for {
  47. select {
  48. case <-ticker.C:
  49. cm.refreshClusters()
  50. case <-cm.stop:
  51. log.Infof("ClusterMap refresh stopped.")
  52. return
  53. }
  54. }
  55. }()
  56. return cm
  57. }
  58. // clusterInfoQuery returns the query string to load cluster info
  59. func clusterInfoQuery(offset string) string {
  60. return fmt.Sprintf("kubecost_cluster_info{%s}%s", env.GetPromClusterFilter(), offset)
  61. }
  62. // loadClusters loads all the cluster info to map
  63. func (pcm *PrometheusClusterMap) loadClusters() (map[string]*clusters.ClusterInfo, error) {
  64. var offset string = ""
  65. if prom.IsThanos(pcm.client) {
  66. offset = thanos.QueryOffset()
  67. }
  68. // Execute Query
  69. tryQuery := func() (interface{}, error) {
  70. ctx := prom.NewNamedContext(pcm.client, prom.ClusterMapContextName)
  71. resCh := ctx.QueryAtTime(clusterInfoQuery(offset), time.Now().Add(-promQueryOffset))
  72. r, e := resCh.Await()
  73. return r, e
  74. }
  75. // Retry on failure
  76. result, err := retry.Retry(context.Background(), tryQuery, uint(LoadRetries), LoadRetryDelay)
  77. qr, ok := result.([]*prom.QueryResult)
  78. if !ok || err != nil {
  79. return nil, err
  80. }
  81. allClusters := make(map[string]*clusters.ClusterInfo)
  82. // Load the query results. Critical fields are id and name.
  83. for _, result := range qr {
  84. id, err := result.GetString("id")
  85. if err != nil {
  86. log.Warnf("Failed to load 'id' field for ClusterInfo")
  87. continue
  88. }
  89. name, err := result.GetString("name")
  90. if err != nil {
  91. log.Warnf("Failed to load 'name' field for ClusterInfo")
  92. continue
  93. }
  94. profile, err := result.GetString("clusterprofile")
  95. if err != nil {
  96. profile = ""
  97. }
  98. provider, err := result.GetString("provider")
  99. if err != nil {
  100. provider = ""
  101. }
  102. account, err := result.GetString("account")
  103. if err != nil {
  104. account = ""
  105. }
  106. project, err := result.GetString("project")
  107. if err != nil {
  108. project = ""
  109. }
  110. region, err := result.GetString("region")
  111. if err != nil {
  112. region = ""
  113. }
  114. provisioner, err := result.GetString("provisioner")
  115. if err != nil {
  116. provisioner = ""
  117. }
  118. allClusters[id] = &clusters.ClusterInfo{
  119. ID: id,
  120. Name: name,
  121. Profile: profile,
  122. Provider: provider,
  123. Account: account,
  124. Project: project,
  125. Region: region,
  126. Provisioner: provisioner,
  127. }
  128. }
  129. // populate the local cluster if it doesn't exist
  130. localInfo, err := pcm.getLocalClusterInfo()
  131. if err != nil {
  132. return allClusters, nil
  133. }
  134. // Check to see if the local cluster's id is part of our loaded clusters, and include if not
  135. if _, ok := allClusters[localInfo.ID]; !ok {
  136. allClusters[localInfo.ID] = localInfo
  137. }
  138. return allClusters, nil
  139. }
  140. // getLocalClusterInfo returns the local cluster info in the event there does not exist a metric available.
  141. func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*clusters.ClusterInfo, error) {
  142. info := pcm.clusterInfo.GetClusterInfo()
  143. clusterInfo, err := MapToClusterInfo(info)
  144. if err != nil {
  145. return nil, fmt.Errorf("Parsing Local Cluster Info Failed: %s", err)
  146. }
  147. return clusterInfo, nil
  148. }
  149. // refreshClusters loads the clusters and updates the internal map
  150. func (pcm *PrometheusClusterMap) refreshClusters() {
  151. updated, err := pcm.loadClusters()
  152. if err != nil {
  153. log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
  154. return
  155. }
  156. pcm.lock.Lock()
  157. pcm.clusters = updated
  158. pcm.lock.Unlock()
  159. }
  160. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  161. func (pcm *PrometheusClusterMap) GetClusterIDs() []string {
  162. pcm.lock.RLock()
  163. defer pcm.lock.RUnlock()
  164. var clusterIDs []string
  165. for id := range pcm.clusters {
  166. clusterIDs = append(clusterIDs, id)
  167. }
  168. return clusterIDs
  169. }
  170. // AsMap returns the cluster map as a standard go map
  171. func (pcm *PrometheusClusterMap) AsMap() map[string]*clusters.ClusterInfo {
  172. pcm.lock.RLock()
  173. defer pcm.lock.RUnlock()
  174. m := make(map[string]*clusters.ClusterInfo)
  175. for k, v := range pcm.clusters {
  176. m[k] = v.Clone()
  177. }
  178. return m
  179. }
  180. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  181. // doesn't exist
  182. func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *clusters.ClusterInfo {
  183. pcm.lock.RLock()
  184. defer pcm.lock.RUnlock()
  185. if info, ok := pcm.clusters[clusterID]; ok {
  186. return info.Clone()
  187. }
  188. return nil
  189. }
  190. // NameFor returns the name of the cluster provided the clusterID.
  191. func (pcm *PrometheusClusterMap) NameFor(clusterID string) string {
  192. pcm.lock.RLock()
  193. defer pcm.lock.RUnlock()
  194. if info, ok := pcm.clusters[clusterID]; ok {
  195. return info.Name
  196. }
  197. return ""
  198. }
  199. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  200. // assigned name. Otherwise, just the clusterID is returned.
  201. func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
  202. pcm.lock.RLock()
  203. defer pcm.lock.RUnlock()
  204. if info, ok := pcm.clusters[clusterID]; ok {
  205. if info.Name == "" {
  206. return clusterID
  207. }
  208. return fmt.Sprintf("%s/%s", info.Name, clusterID)
  209. }
  210. return clusterID
  211. }
  212. // SplitNameID is a helper method that removes the common split format and returns
  213. func SplitNameID(nameID string) (id string, name string) {
  214. if !strings.Contains(nameID, "/") {
  215. id = nameID
  216. name = ""
  217. return
  218. }
  219. split := strings.Split(nameID, "/")
  220. name = split[0]
  221. id = split[1]
  222. return
  223. }
  224. // StopRefresh stops the automatic internal map refresh
  225. func (pcm *PrometheusClusterMap) StopRefresh() {
  226. if pcm.stop != nil {
  227. close(pcm.stop)
  228. pcm.stop = nil
  229. }
  230. }
  231. // MapToClusterInfo returns a ClusterInfo using parsed data from a string map. If
  232. // parsing the map fails for id and/or name, an error is returned.
  233. func MapToClusterInfo(info map[string]string) (*clusters.ClusterInfo, error) {
  234. var id string
  235. var name string
  236. if i, ok := info[clusters.ClusterInfoIdKey]; ok {
  237. id = i
  238. } else {
  239. return nil, fmt.Errorf("Cluster Info Missing ID")
  240. }
  241. if n, ok := info[clusters.ClusterInfoNameKey]; ok {
  242. name = n
  243. } else {
  244. name = id
  245. }
  246. var clusterProfile string
  247. var provider string
  248. var account string
  249. var project string
  250. var region string
  251. var provisioner string
  252. if cp, ok := info[clusters.ClusterInfoProfileKey]; ok {
  253. clusterProfile = cp
  254. }
  255. if pvdr, ok := info[clusters.ClusterInfoProviderKey]; ok {
  256. provider = pvdr
  257. }
  258. if acct, ok := info[clusters.ClusterInfoAccountKey]; ok {
  259. account = acct
  260. }
  261. if proj, ok := info[clusters.ClusterInfoProjectKey]; ok {
  262. project = proj
  263. }
  264. if reg, ok := info[clusters.ClusterInfoRegionKey]; ok {
  265. region = reg
  266. }
  267. if pvsr, ok := info[clusters.ClusterInfoProvisionerKey]; ok {
  268. provisioner = pvsr
  269. }
  270. return &clusters.ClusterInfo{
  271. ID: id,
  272. Name: name,
  273. Profile: clusterProfile,
  274. Provider: provider,
  275. Account: account,
  276. Project: project,
  277. Region: region,
  278. Provisioner: provisioner,
  279. }, nil
  280. }