package prom import ( "context" "fmt" "strings" "sync" "time" "github.com/opencost/opencost/modules/prometheus-source/pkg/env" "github.com/opencost/opencost/core/pkg/clusters" "github.com/opencost/opencost/core/pkg/log" "github.com/opencost/opencost/core/pkg/source" "github.com/opencost/opencost/core/pkg/util/retry" ) const ( LoadRetries int = 6 LoadRetryDelay time.Duration = 10 * time.Second ) // ClusterMap keeps records of all known cost-model clusters. type PrometheusClusterMap struct { lock sync.RWMutex contextFactory *ContextFactory clusters map[string]*clusters.ClusterInfo clusterInfo clusters.ClusterInfoProvider stop chan struct{} } // newPrometheusClusterMap creates a new ClusterMap implementation using prometheus func newPrometheusClusterMap(contextFactory *ContextFactory, cip clusters.ClusterInfoProvider, refresh time.Duration) clusters.ClusterMap { stop := make(chan struct{}) cm := &PrometheusClusterMap{ contextFactory: contextFactory, clusters: make(map[string]*clusters.ClusterInfo), clusterInfo: cip, stop: stop, } // Run an updater to ensure cluster data stays relevant over time go func() { // Immediately Attempt to refresh the clusters cm.refreshClusters() // Tick on interval and refresh clusters ticker := time.NewTicker(refresh) for { select { case <-ticker.C: cm.refreshClusters() case <-cm.stop: log.Infof("ClusterMap refresh stopped.") return } } }() return cm } // clusterInfoQuery returns the query string to load cluster info func clusterInfoQuery(offset string) string { return fmt.Sprintf("kubecost_cluster_info{%s}%s", env.GetPromClusterFilter(), offset) } // loadClusters loads all the cluster info to map func (pcm *PrometheusClusterMap) loadClusters() (map[string]*clusters.ClusterInfo, error) { var offset string = "" // Execute Query tryQuery := func() (interface{}, error) { ctx := pcm.contextFactory.NewNamedContext(ClusterMapContextName) resCh := ctx.QueryAtTime(clusterInfoQuery(offset), time.Now()) r, e := resCh.Await() return r, e } // Retry on failure result, err := retry.Retry(context.Background(), tryQuery, uint(LoadRetries), LoadRetryDelay) qr, ok := result.([]*source.QueryResult) if !ok || err != nil { return nil, err } allClusters := make(map[string]*clusters.ClusterInfo) // Load the query results. Critical fields are id and name. for _, result := range qr { id, err := result.GetString("id") if err != nil { log.Warnf("Failed to load 'id' field for ClusterInfo") continue } name, err := result.GetString("name") if err != nil { log.Warnf("Failed to load 'name' field for ClusterInfo") continue } profile, err := result.GetString("clusterprofile") if err != nil { profile = "" } provider, err := result.GetString("provider") if err != nil { provider = "" } account, err := result.GetString("account") if err != nil { account = "" } project, err := result.GetString("project") if err != nil { project = "" } region, err := result.GetString("region") if err != nil { region = "" } provisioner, err := result.GetString("provisioner") if err != nil { provisioner = "" } allClusters[id] = &clusters.ClusterInfo{ ID: id, Name: name, Profile: profile, Provider: provider, Account: account, Project: project, Region: region, Provisioner: provisioner, } } // populate the local cluster if it doesn't exist localInfo, err := pcm.getLocalClusterInfo() if err != nil { return allClusters, nil } // Check to see if the local cluster's id is part of our loaded clusters, and include if not if _, ok := allClusters[localInfo.ID]; !ok { allClusters[localInfo.ID] = localInfo } return allClusters, nil } // getLocalClusterInfo returns the local cluster info in the event there does not exist a metric available. func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*clusters.ClusterInfo, error) { info := pcm.clusterInfo.GetClusterInfo() clusterInfo, err := clusters.MapToClusterInfo(info) if err != nil { return nil, fmt.Errorf("parsing local cluster info failed: %w", err) } return clusterInfo, nil } // refreshClusters loads the clusters and updates the internal map func (pcm *PrometheusClusterMap) refreshClusters() { updated, err := pcm.loadClusters() if err != nil { log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries) return } pcm.lock.Lock() pcm.clusters = updated pcm.lock.Unlock() } // GetClusterIDs returns a slice containing all of the cluster identifiers. func (pcm *PrometheusClusterMap) GetClusterIDs() []string { pcm.lock.RLock() defer pcm.lock.RUnlock() var clusterIDs []string for id := range pcm.clusters { clusterIDs = append(clusterIDs, id) } return clusterIDs } // AsMap returns the cluster map as a standard go map func (pcm *PrometheusClusterMap) AsMap() map[string]*clusters.ClusterInfo { pcm.lock.RLock() defer pcm.lock.RUnlock() m := make(map[string]*clusters.ClusterInfo) for k, v := range pcm.clusters { m[k] = v.Clone() } return m } // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it // doesn't exist func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *clusters.ClusterInfo { pcm.lock.RLock() defer pcm.lock.RUnlock() if info, ok := pcm.clusters[clusterID]; ok { return info.Clone() } return nil } // NameFor returns the name of the cluster provided the clusterID. func (pcm *PrometheusClusterMap) NameFor(clusterID string) string { pcm.lock.RLock() defer pcm.lock.RUnlock() if info, ok := pcm.clusters[clusterID]; ok { return info.Name } return "" } // NameIDFor returns an identifier in the format "/" if the cluster has an // assigned name. Otherwise, just the clusterID is returned. func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string { pcm.lock.RLock() defer pcm.lock.RUnlock() if info, ok := pcm.clusters[clusterID]; ok { if info.Name == "" { return clusterID } return fmt.Sprintf("%s/%s", info.Name, clusterID) } return clusterID } // SplitNameID is a helper method that removes the common split format and returns func SplitNameID(nameID string) (id string, name string) { if !strings.Contains(nameID, "/") { id = nameID name = "" return } split := strings.Split(nameID, "/") name = split[0] id = split[1] return } // StopRefresh stops the automatic internal map refresh func (pcm *PrometheusClusterMap) StopRefresh() { if pcm.stop != nil { close(pcm.stop) pcm.stop = nil } }