| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- package prom
- import (
- "context"
- "fmt"
- "strings"
- "sync"
- "time"
- "github.com/opencost/opencost/modules/prometheus-source/pkg/env"
- "github.com/opencost/opencost/core/pkg/clusters"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/source"
- "github.com/opencost/opencost/core/pkg/util/retry"
- )
- const (
- LoadRetries int = 6
- LoadRetryDelay time.Duration = 10 * time.Second
- )
- // ClusterMap keeps records of all known cost-model clusters.
- type PrometheusClusterMap struct {
- lock sync.RWMutex
- contextFactory *ContextFactory
- clusters map[string]*clusters.ClusterInfo
- clusterInfo clusters.ClusterInfoProvider
- stop chan struct{}
- }
- // newPrometheusClusterMap creates a new ClusterMap implementation using prometheus
- func newPrometheusClusterMap(contextFactory *ContextFactory, cip clusters.ClusterInfoProvider, refresh time.Duration) clusters.ClusterMap {
- stop := make(chan struct{})
- cm := &PrometheusClusterMap{
- contextFactory: contextFactory,
- clusters: make(map[string]*clusters.ClusterInfo),
- clusterInfo: cip,
- stop: stop,
- }
- // Run an updater to ensure cluster data stays relevant over time
- go func() {
- // Immediately Attempt to refresh the clusters
- cm.refreshClusters()
- // Tick on interval and refresh clusters
- ticker := time.NewTicker(refresh)
- for {
- select {
- case <-ticker.C:
- cm.refreshClusters()
- case <-cm.stop:
- log.Infof("ClusterMap refresh stopped.")
- return
- }
- }
- }()
- return cm
- }
- // clusterInfoQuery returns the query string to load cluster info
- func clusterInfoQuery(offset string) string {
- return fmt.Sprintf("kubecost_cluster_info{%s}%s", env.GetPromClusterFilter(), offset)
- }
- // loadClusters loads all the cluster info to map
- func (pcm *PrometheusClusterMap) loadClusters() (map[string]*clusters.ClusterInfo, error) {
- var offset string = ""
- // Execute Query
- tryQuery := func() (interface{}, error) {
- ctx := pcm.contextFactory.NewNamedContext(ClusterMapContextName)
- resCh := ctx.QueryAtTime(clusterInfoQuery(offset), time.Now())
- r, e := resCh.Await()
- return r, e
- }
- // Retry on failure
- result, err := retry.Retry(context.Background(), tryQuery, uint(LoadRetries), LoadRetryDelay)
- qr, ok := result.([]*source.QueryResult)
- if !ok || err != nil {
- return nil, err
- }
- allClusters := make(map[string]*clusters.ClusterInfo)
- // Load the query results. Critical fields are id and name.
- for _, result := range qr {
- id, err := result.GetString("id")
- if err != nil {
- log.Warnf("Failed to load 'id' field for ClusterInfo")
- continue
- }
- name, err := result.GetString("name")
- if err != nil {
- log.Warnf("Failed to load 'name' field for ClusterInfo")
- continue
- }
- profile, err := result.GetString("clusterprofile")
- if err != nil {
- profile = ""
- }
- provider, err := result.GetString("provider")
- if err != nil {
- provider = ""
- }
- account, err := result.GetString("account")
- if err != nil {
- account = ""
- }
- project, err := result.GetString("project")
- if err != nil {
- project = ""
- }
- region, err := result.GetString("region")
- if err != nil {
- region = ""
- }
- provisioner, err := result.GetString("provisioner")
- if err != nil {
- provisioner = ""
- }
- allClusters[id] = &clusters.ClusterInfo{
- ID: id,
- Name: name,
- Profile: profile,
- Provider: provider,
- Account: account,
- Project: project,
- Region: region,
- Provisioner: provisioner,
- }
- }
- // populate the local cluster if it doesn't exist
- localInfo, err := pcm.getLocalClusterInfo()
- if err != nil {
- return allClusters, nil
- }
- // Check to see if the local cluster's id is part of our loaded clusters, and include if not
- if _, ok := allClusters[localInfo.ID]; !ok {
- allClusters[localInfo.ID] = localInfo
- }
- return allClusters, nil
- }
- // getLocalClusterInfo returns the local cluster info in the event there does not exist a metric available.
- func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*clusters.ClusterInfo, error) {
- info := pcm.clusterInfo.GetClusterInfo()
- clusterInfo, err := clusters.MapToClusterInfo(info)
- if err != nil {
- return nil, fmt.Errorf("parsing local cluster info failed: %w", err)
- }
- return clusterInfo, nil
- }
- // refreshClusters loads the clusters and updates the internal map
- func (pcm *PrometheusClusterMap) refreshClusters() {
- updated, err := pcm.loadClusters()
- if err != nil {
- log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
- return
- }
- pcm.lock.Lock()
- pcm.clusters = updated
- pcm.lock.Unlock()
- }
- // GetClusterIDs returns a slice containing all of the cluster identifiers.
- func (pcm *PrometheusClusterMap) GetClusterIDs() []string {
- pcm.lock.RLock()
- defer pcm.lock.RUnlock()
- var clusterIDs []string
- for id := range pcm.clusters {
- clusterIDs = append(clusterIDs, id)
- }
- return clusterIDs
- }
- // AsMap returns the cluster map as a standard go map
- func (pcm *PrometheusClusterMap) AsMap() map[string]*clusters.ClusterInfo {
- pcm.lock.RLock()
- defer pcm.lock.RUnlock()
- m := make(map[string]*clusters.ClusterInfo)
- for k, v := range pcm.clusters {
- m[k] = v.Clone()
- }
- return m
- }
- // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
- // doesn't exist
- func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *clusters.ClusterInfo {
- pcm.lock.RLock()
- defer pcm.lock.RUnlock()
- if info, ok := pcm.clusters[clusterID]; ok {
- return info.Clone()
- }
- return nil
- }
- // NameFor returns the name of the cluster provided the clusterID.
- func (pcm *PrometheusClusterMap) NameFor(clusterID string) string {
- pcm.lock.RLock()
- defer pcm.lock.RUnlock()
- if info, ok := pcm.clusters[clusterID]; ok {
- return info.Name
- }
- return ""
- }
- // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
- // assigned name. Otherwise, just the clusterID is returned.
- func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
- pcm.lock.RLock()
- defer pcm.lock.RUnlock()
- if info, ok := pcm.clusters[clusterID]; ok {
- if info.Name == "" {
- return clusterID
- }
- return fmt.Sprintf("%s/%s", info.Name, clusterID)
- }
- return clusterID
- }
- // SplitNameID is a helper method that removes the common split format and returns
- func SplitNameID(nameID string) (id string, name string) {
- if !strings.Contains(nameID, "/") {
- id = nameID
- name = ""
- return
- }
- split := strings.Split(nameID, "/")
- name = split[0]
- id = split[1]
- return
- }
- // StopRefresh stops the automatic internal map refresh
- func (pcm *PrometheusClusterMap) StopRefresh() {
- if pcm.stop != nil {
- close(pcm.stop)
- pcm.stop = nil
- }
- }
|