clustermap.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. package clusters
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/opencost/opencost/pkg/env"
  9. "github.com/opencost/opencost/pkg/log"
  10. "github.com/opencost/opencost/pkg/prom"
  11. "github.com/opencost/opencost/pkg/thanos"
  12. "github.com/opencost/opencost/pkg/util/retry"
  13. prometheus "github.com/prometheus/client_golang/api"
  14. )
  15. const (
  16. LoadRetries int = 6
  17. LoadRetryDelay time.Duration = 10 * time.Second
  18. )
  19. // The following constants are used as keys into the cluster info map data structure
  20. const (
  21. ClusterInfoIdKey = "id"
  22. ClusterInfoNameKey = "name"
  23. ClusterInfoProviderKey = "provider"
  24. ClusterInfoProjectKey = "project"
  25. ClusterInfoAccountKey = "account"
  26. ClusterInfoRegionKey = "region"
  27. ClusterInfoProvisionerKey = "provisioner"
  28. ClusterInfoProfileKey = "clusterProfile"
  29. ClusterInfoLogCollectionKey = "logCollection"
  30. ClusterInfoProductAnalyticsKey = "productAnalytics"
  31. ClusterInfoErrorReportingKey = "errorReporting"
  32. ClusterInfoValuesReportingKey = "valuesReporting"
  33. ClusterInfoThanosEnabledKey = "thanosEnabled"
  34. ClusterInfoThanosOffsetKey = "thanosOffset"
  35. ClusterInfoVersionKey = "version"
  36. )
  37. // prometheus query offset to apply to each non-range query
  38. // package scope to prevent calling duration parse each use
  39. var promQueryOffset = env.GetPrometheusQueryOffset()
  40. // ClusterInfo holds attributes of Cluster from metrics pulled from Prometheus
  41. type ClusterInfo struct {
  42. ID string `json:"id"`
  43. Name string `json:"name"`
  44. Profile string `json:"profile"`
  45. Provider string `json:"provider"`
  46. Account string `json:"account"`
  47. Project string `json:"project"`
  48. Region string `json:"region"`
  49. Provisioner string `json:"provisioner"`
  50. }
  51. // Clone creates a copy of ClusterInfo and returns it
  52. func (ci *ClusterInfo) Clone() *ClusterInfo {
  53. if ci == nil {
  54. return nil
  55. }
  56. return &ClusterInfo{
  57. ID: ci.ID,
  58. Name: ci.Name,
  59. Profile: ci.Profile,
  60. Provider: ci.Provider,
  61. Account: ci.Account,
  62. Project: ci.Project,
  63. Region: ci.Region,
  64. Provisioner: ci.Provisioner,
  65. }
  66. }
  67. type ClusterMap interface {
  68. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  69. GetClusterIDs() []string
  70. // AsMap returns the cluster map as a standard go map
  71. AsMap() map[string]*ClusterInfo
  72. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  73. // doesn't exist
  74. InfoFor(clusterID string) *ClusterInfo
  75. // NameFor returns the name of the cluster provided the clusterID.
  76. NameFor(clusterID string) string
  77. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  78. // assigned name. Otherwise, just the clusterID is returned.
  79. NameIDFor(clusterID string) string
  80. }
  81. // ClusterInfoProvider is a contract which is capable of performing cluster info lookups.
  82. type ClusterInfoProvider interface {
  83. // GetClusterInfo returns a string map containing the local/remote connected cluster info
  84. GetClusterInfo() map[string]string
  85. }
  86. // ClusterMap keeps records of all known cost-model clusters.
  87. type PrometheusClusterMap struct {
  88. lock sync.RWMutex
  89. client prometheus.Client
  90. clusters map[string]*ClusterInfo
  91. clusterInfo ClusterInfoProvider
  92. stop chan struct{}
  93. }
  94. // NewClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
  95. func NewClusterMap(client prometheus.Client, cip ClusterInfoProvider, refresh time.Duration) ClusterMap {
  96. stop := make(chan struct{})
  97. cm := &PrometheusClusterMap{
  98. client: client,
  99. clusters: make(map[string]*ClusterInfo),
  100. clusterInfo: cip,
  101. stop: stop,
  102. }
  103. // Run an updater to ensure cluster data stays relevant over time
  104. go func() {
  105. // Immediately Attempt to refresh the clusters
  106. cm.refreshClusters()
  107. // Tick on interval and refresh clusters
  108. ticker := time.NewTicker(refresh)
  109. for {
  110. select {
  111. case <-ticker.C:
  112. cm.refreshClusters()
  113. case <-cm.stop:
  114. log.Infof("ClusterMap refresh stopped.")
  115. return
  116. }
  117. }
  118. }()
  119. return cm
  120. }
  121. // clusterInfoQuery returns the query string to load cluster info
  122. func clusterInfoQuery(offset string) string {
  123. return fmt.Sprintf("kubecost_cluster_info{%s}%s", env.GetPromClusterFilter(), offset)
  124. }
  125. // loadClusters loads all the cluster info to map
  126. func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error) {
  127. var offset string = ""
  128. if prom.IsThanos(pcm.client) {
  129. offset = thanos.QueryOffset()
  130. }
  131. // Execute Query
  132. tryQuery := func() (interface{}, error) {
  133. ctx := prom.NewNamedContext(pcm.client, prom.ClusterMapContextName)
  134. resCh := ctx.QueryAtTime(clusterInfoQuery(offset), time.Now().Add(-promQueryOffset))
  135. r, e := resCh.Await()
  136. return r, e
  137. }
  138. // Retry on failure
  139. result, err := retry.Retry(context.Background(), tryQuery, uint(LoadRetries), LoadRetryDelay)
  140. qr, ok := result.([]*prom.QueryResult)
  141. if !ok || err != nil {
  142. return nil, err
  143. }
  144. clusters := make(map[string]*ClusterInfo)
  145. // Load the query results. Critical fields are id and name.
  146. for _, result := range qr {
  147. id, err := result.GetString("id")
  148. if err != nil {
  149. log.Warnf("Failed to load 'id' field for ClusterInfo")
  150. continue
  151. }
  152. name, err := result.GetString("name")
  153. if err != nil {
  154. log.Warnf("Failed to load 'name' field for ClusterInfo")
  155. continue
  156. }
  157. profile, err := result.GetString("clusterprofile")
  158. if err != nil {
  159. profile = ""
  160. }
  161. provider, err := result.GetString("provider")
  162. if err != nil {
  163. provider = ""
  164. }
  165. account, err := result.GetString("account")
  166. if err != nil {
  167. account = ""
  168. }
  169. project, err := result.GetString("project")
  170. if err != nil {
  171. project = ""
  172. }
  173. region, err := result.GetString("region")
  174. if err != nil {
  175. region = ""
  176. }
  177. provisioner, err := result.GetString("provisioner")
  178. if err != nil {
  179. provisioner = ""
  180. }
  181. clusters[id] = &ClusterInfo{
  182. ID: id,
  183. Name: name,
  184. Profile: profile,
  185. Provider: provider,
  186. Account: account,
  187. Project: project,
  188. Region: region,
  189. Provisioner: provisioner,
  190. }
  191. }
  192. // populate the local cluster if it doesn't exist
  193. localInfo, err := pcm.getLocalClusterInfo()
  194. if err != nil {
  195. return clusters, nil
  196. }
  197. // Check to see if the local cluster's id is part of our loaded clusters, and include if not
  198. if _, ok := clusters[localInfo.ID]; !ok {
  199. clusters[localInfo.ID] = localInfo
  200. }
  201. return clusters, nil
  202. }
  203. // getLocalClusterInfo returns the local cluster info in the event there does not exist a metric available.
  204. func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*ClusterInfo, error) {
  205. info := pcm.clusterInfo.GetClusterInfo()
  206. clusterInfo, err := MapToClusterInfo(info)
  207. if err != nil {
  208. return nil, fmt.Errorf("Parsing Local Cluster Info Failed: %s", err)
  209. }
  210. return clusterInfo, nil
  211. }
  212. // refreshClusters loads the clusters and updates the internal map
  213. func (pcm *PrometheusClusterMap) refreshClusters() {
  214. updated, err := pcm.loadClusters()
  215. if err != nil {
  216. log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
  217. return
  218. }
  219. pcm.lock.Lock()
  220. pcm.clusters = updated
  221. pcm.lock.Unlock()
  222. }
  223. // GetClusterIDs returns a slice containing all of the cluster identifiers.
  224. func (pcm *PrometheusClusterMap) GetClusterIDs() []string {
  225. pcm.lock.RLock()
  226. defer pcm.lock.RUnlock()
  227. var clusterIDs []string
  228. for id := range pcm.clusters {
  229. clusterIDs = append(clusterIDs, id)
  230. }
  231. return clusterIDs
  232. }
  233. // AsMap returns the cluster map as a standard go map
  234. func (pcm *PrometheusClusterMap) AsMap() map[string]*ClusterInfo {
  235. pcm.lock.RLock()
  236. defer pcm.lock.RUnlock()
  237. m := make(map[string]*ClusterInfo)
  238. for k, v := range pcm.clusters {
  239. m[k] = v.Clone()
  240. }
  241. return m
  242. }
  243. // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
  244. // doesn't exist
  245. func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *ClusterInfo {
  246. pcm.lock.RLock()
  247. defer pcm.lock.RUnlock()
  248. if info, ok := pcm.clusters[clusterID]; ok {
  249. return info.Clone()
  250. }
  251. return nil
  252. }
  253. // NameFor returns the name of the cluster provided the clusterID.
  254. func (pcm *PrometheusClusterMap) NameFor(clusterID string) string {
  255. pcm.lock.RLock()
  256. defer pcm.lock.RUnlock()
  257. if info, ok := pcm.clusters[clusterID]; ok {
  258. return info.Name
  259. }
  260. return ""
  261. }
  262. // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
  263. // assigned name. Otherwise, just the clusterID is returned.
  264. func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
  265. pcm.lock.RLock()
  266. defer pcm.lock.RUnlock()
  267. if info, ok := pcm.clusters[clusterID]; ok {
  268. if info.Name == "" {
  269. return clusterID
  270. }
  271. return fmt.Sprintf("%s/%s", info.Name, clusterID)
  272. }
  273. return clusterID
  274. }
  275. // SplitNameID is a helper method that removes the common split format and returns
  276. func SplitNameID(nameID string) (id string, name string) {
  277. if !strings.Contains(nameID, "/") {
  278. id = nameID
  279. name = ""
  280. return
  281. }
  282. split := strings.Split(nameID, "/")
  283. name = split[0]
  284. id = split[1]
  285. return
  286. }
  287. // StopRefresh stops the automatic internal map refresh
  288. func (pcm *PrometheusClusterMap) StopRefresh() {
  289. if pcm.stop != nil {
  290. close(pcm.stop)
  291. pcm.stop = nil
  292. }
  293. }
  294. // MapToClusterInfo returns a ClusterInfo using parsed data from a string map. If
  295. // parsing the map fails for id and/or name, an error is returned.
  296. func MapToClusterInfo(info map[string]string) (*ClusterInfo, error) {
  297. var id string
  298. var name string
  299. if i, ok := info[ClusterInfoIdKey]; ok {
  300. id = i
  301. } else {
  302. return nil, fmt.Errorf("Cluster Info Missing ID")
  303. }
  304. if n, ok := info[ClusterInfoNameKey]; ok {
  305. name = n
  306. } else {
  307. name = id
  308. }
  309. var clusterProfile string
  310. var provider string
  311. var account string
  312. var project string
  313. var region string
  314. var provisioner string
  315. if cp, ok := info[ClusterInfoProfileKey]; ok {
  316. clusterProfile = cp
  317. }
  318. if pvdr, ok := info[ClusterInfoProviderKey]; ok {
  319. provider = pvdr
  320. }
  321. if acct, ok := info[ClusterInfoAccountKey]; ok {
  322. account = acct
  323. }
  324. if proj, ok := info[ClusterInfoProjectKey]; ok {
  325. project = proj
  326. }
  327. if reg, ok := info[ClusterInfoRegionKey]; ok {
  328. region = reg
  329. }
  330. if pvsr, ok := info[ClusterInfoProvisionerKey]; ok {
  331. provisioner = pvsr
  332. }
  333. return &ClusterInfo{
  334. ID: id,
  335. Name: name,
  336. Profile: clusterProfile,
  337. Provider: provider,
  338. Account: account,
  339. Project: project,
  340. Region: region,
  341. Provisioner: provisioner,
  342. }, nil
  343. }