clustermap.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. package clusters
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/opencost/opencost/pkg/env"
  9. "github.com/opencost/opencost/core/pkg/clusters"
  10. "github.com/opencost/opencost/core/pkg/log"
  11. "github.com/opencost/opencost/core/pkg/source"
  12. "github.com/opencost/opencost/core/pkg/util/retry"
  13. "github.com/opencost/opencost/pkg/prom"
  14. "github.com/opencost/opencost/pkg/thanos"
  15. prometheus "github.com/prometheus/client_golang/api"
  16. )
  17. const (
  18. LoadRetries int = 6
  19. LoadRetryDelay time.Duration = 10 * time.Second
  20. )
  21. // prometheus query offset to apply to each non-range query
  22. // package scope to prevent calling duration parse each use
  23. var promQueryOffset = env.GetPrometheusQueryOffset()
  24. // ClusterMap keeps records of all known cost-model clusters.
  25. type PrometheusClusterMap struct {
  26. lock sync.RWMutex
  27. client prometheus.Client
  28. clusters map[string]*clusters.ClusterInfo
  29. clusterInfo clusters.ClusterInfoProvider
  30. stop chan struct{}
  31. }
  32. // NewClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
  33. func NewClusterMap(client prometheus.Client, cip clusters.ClusterInfoProvider, refresh time.Duration) clusters.ClusterMap {
  34. stop := make(chan struct{})
  35. cm := &PrometheusClusterMap{
  36. client: client,
  37. clusters: make(map[string]*clusters.ClusterInfo),
  38. clusterInfo: cip,
  39. stop: stop,
  40. }
  41. // Run an updater to ensure cluster data stays relevant over time
  42. go func() {
  43. // Immediately Attempt to refresh the clusters
  44. cm.refreshClusters()
  45. // Tick on interval and refresh clusters
  46. ticker := time.NewTicker(refresh)
  47. for {
  48. select {
  49. case <-ticker.C:
  50. cm.refreshClusters()
  51. case <-cm.stop:
  52. log.Infof("ClusterMap refresh stopped.")
  53. return
  54. }
  55. }
  56. }()
  57. return cm
  58. }
  59. // clusterInfoQuery returns the query string to load cluster info
  60. func clusterInfoQuery(offset string) string {
  61. return fmt.Sprintf("kubecost_cluster_info{%s}%s", env.GetPromClusterFilter(), offset)
  62. }
  63. // loadClusters loads all the cluster info to map
  64. func (pcm *PrometheusClusterMap) loadClusters() (map[string]*clusters.ClusterInfo, error) {
  65. var offset string = ""
  66. if prom.IsThanos(pcm.client) {
  67. offset = thanos.QueryOffset()
  68. }
  69. // Execute Query
  70. tryQuery := func() (interface{}, error) {
  71. ctx := prom.NewNamedContext(pcm.client, prom.ClusterMapContextName)
  72. resCh := ctx.QueryAtTime(clusterInfoQuery(offset), time.Now().Add(-promQueryOffset))
  73. r, e := resCh.Await()
  74. return r, e
  75. }
  76. // Retry on failure
  77. result, err := retry.Retry(context.Background(), tryQuery, uint(LoadRetries), LoadRetryDelay)
  78. qr, ok := result.([]*source.QueryResult)
  79. if !ok || err != nil {
  80. return nil, err
  81. }
  82. allClusters := make(map[string]*clusters.ClusterInfo)
  83. // Load the query results. Critical fields are id and name.
  84. for _, result := range qr {
  85. id, err := result.GetString("id")
  86. if err != nil {
  87. log.Warnf("Failed to load 'id' field for ClusterInfo")
  88. continue
  89. }
  90. name, err := result.GetString("name")
  91. if err != nil {
  92. log.Warnf("Failed to load 'name' field for ClusterInfo")
  93. continue
  94. }
  95. profile, err := result.GetString("clusterprofile")
  96. if err != nil {
  97. profile = ""
  98. }
  99. provider, err := result.GetString("provider")
  100. if err != nil {
  101. provider = ""
  102. }
  103. account, err := result.GetString("account")
  104. if err != nil {
  105. account = ""
  106. }
  107. project, err := result.GetString("project")
  108. if err != nil {
  109. project = ""
  110. }
  111. region, err := result.GetString("region")
  112. if err != nil {
  113. region = ""
  114. }
  115. provisioner, err := result.GetString("provisioner")
  116. if err != nil {
  117. provisioner = ""
  118. }
  119. allClusters[id] = &clusters.ClusterInfo{
  120. ID: id,
  121. Name: name,
  122. Profile: profile,
  123. Provider: provider,
  124. Account: account,
  125. Project: project,
  126. Region: region,
  127. Provisioner: provisioner,
  128. }
  129. }
  130. // populate the local cluster if it doesn't exist
  131. localInfo, err := pcm.getLocalClusterInfo()
  132. if err != nil {
  133. return allClusters, nil
  134. }
  135. // Check to see if the local cluster's id is part of our loaded clusters, and include if not
  136. if _, ok := allClusters[localInfo.ID]; !ok {
  137. allClusters[localInfo.ID] = localInfo
  138. }
  139. return allClusters, nil
  140. }
  141. // getLocalClusterInfo returns the local cluster info in the event there does not exist a metric available.
  142. func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*clusters.ClusterInfo, error) {
  143. info := pcm.clusterInfo.GetClusterInfo()
  144. clusterInfo, err := MapToClusterInfo(info)
  145. if err != nil {
  146. return nil, fmt.Errorf("Parsing Local Cluster Info Failed: %s", err)
  147. }
  148. return clusterInfo, nil
  149. }
  150. // refreshClusters loads the clusters and updates the internal map
  151. func (pcm *PrometheusClusterMap) refreshClusters() {
  152. updated, err := pcm.loadClusters()
  153. if err != nil {
  154. log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
  155. return
  156. }
  157. pcm.lock.Lock()
  158. pcm.clusters = updated
  159. pcm.lock.Unlock()
  160. }
  161. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  162. func (pcm *PrometheusClusterMap) GetClusterIDs() []string {
  163. pcm.lock.RLock()
  164. defer pcm.lock.RUnlock()
  165. var clusterIDs []string
  166. for id := range pcm.clusters {
  167. clusterIDs = append(clusterIDs, id)
  168. }
  169. return clusterIDs
  170. }
  171. // AsMap returns the cluster map as a standard go map
  172. func (pcm *PrometheusClusterMap) AsMap() map[string]*clusters.ClusterInfo {
  173. pcm.lock.RLock()
  174. defer pcm.lock.RUnlock()
  175. m := make(map[string]*clusters.ClusterInfo)
  176. for k, v := range pcm.clusters {
  177. m[k] = v.Clone()
  178. }
  179. return m
  180. }
  181. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  182. // doesn't exist
  183. func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *clusters.ClusterInfo {
  184. pcm.lock.RLock()
  185. defer pcm.lock.RUnlock()
  186. if info, ok := pcm.clusters[clusterID]; ok {
  187. return info.Clone()
  188. }
  189. return nil
  190. }
  191. // NameFor returns the name of the cluster provided the clusterID.
  192. func (pcm *PrometheusClusterMap) NameFor(clusterID string) string {
  193. pcm.lock.RLock()
  194. defer pcm.lock.RUnlock()
  195. if info, ok := pcm.clusters[clusterID]; ok {
  196. return info.Name
  197. }
  198. return ""
  199. }
  200. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  201. // assigned name. Otherwise, just the clusterID is returned.
  202. func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
  203. pcm.lock.RLock()
  204. defer pcm.lock.RUnlock()
  205. if info, ok := pcm.clusters[clusterID]; ok {
  206. if info.Name == "" {
  207. return clusterID
  208. }
  209. return fmt.Sprintf("%s/%s", info.Name, clusterID)
  210. }
  211. return clusterID
  212. }
  213. // SplitNameID is a helper method that removes the common split format and returns
  214. func SplitNameID(nameID string) (id string, name string) {
  215. if !strings.Contains(nameID, "/") {
  216. id = nameID
  217. name = ""
  218. return
  219. }
  220. split := strings.Split(nameID, "/")
  221. name = split[0]
  222. id = split[1]
  223. return
  224. }
  225. // StopRefresh stops the automatic internal map refresh
  226. func (pcm *PrometheusClusterMap) StopRefresh() {
  227. if pcm.stop != nil {
  228. close(pcm.stop)
  229. pcm.stop = nil
  230. }
  231. }
  232. // MapToClusterInfo returns a ClusterInfo using parsed data from a string map. If
  233. // parsing the map fails for id and/or name, an error is returned.
  234. func MapToClusterInfo(info map[string]string) (*clusters.ClusterInfo, error) {
  235. var id string
  236. var name string
  237. if i, ok := info[clusters.ClusterInfoIdKey]; ok {
  238. id = i
  239. } else {
  240. return nil, fmt.Errorf("Cluster Info Missing ID")
  241. }
  242. if n, ok := info[clusters.ClusterInfoNameKey]; ok {
  243. name = n
  244. } else {
  245. name = id
  246. }
  247. var clusterProfile string
  248. var provider string
  249. var account string
  250. var project string
  251. var region string
  252. var provisioner string
  253. if cp, ok := info[clusters.ClusterInfoProfileKey]; ok {
  254. clusterProfile = cp
  255. }
  256. if pvdr, ok := info[clusters.ClusterInfoProviderKey]; ok {
  257. provider = pvdr
  258. }
  259. if acct, ok := info[clusters.ClusterInfoAccountKey]; ok {
  260. account = acct
  261. }
  262. if proj, ok := info[clusters.ClusterInfoProjectKey]; ok {
  263. project = proj
  264. }
  265. if reg, ok := info[clusters.ClusterInfoRegionKey]; ok {
  266. region = reg
  267. }
  268. if pvsr, ok := info[clusters.ClusterInfoProvisionerKey]; ok {
  269. provisioner = pvsr
  270. }
  271. return &clusters.ClusterInfo{
  272. ID: id,
  273. Name: name,
  274. Profile: clusterProfile,
  275. Provider: provider,
  276. Account: account,
  277. Project: project,
  278. Region: region,
  279. Provisioner: provisioner,
  280. }, nil
  281. }