| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- package clusters
- import (
- "context"
- "fmt"
- "strings"
- "sync"
- "time"
- "github.com/opencost/opencost/pkg/env"
- "github.com/opencost/opencost/pkg/log"
- "github.com/opencost/opencost/pkg/prom"
- "github.com/opencost/opencost/pkg/thanos"
- "github.com/opencost/opencost/pkg/util/retry"
- prometheus "github.com/prometheus/client_golang/api"
- )
- const (
- LoadRetries int = 6
- LoadRetryDelay time.Duration = 10 * time.Second
- )
- // prometheus query offset to apply to each non-range query
- // package scope to prevent calling duration parse each use
- var promQueryOffset = env.GetPrometheusQueryOffset()
- // ClusterInfo holds attributes of Cluster from metrics pulled from Prometheus
- type ClusterInfo struct {
- ID string `json:"id"`
- Name string `json:"name"`
- Profile string `json:"profile"`
- Provider string `json:"provider"`
- Account string `json:"account"`
- Project string `json:"project"`
- Region string `json:"region"`
- Provisioner string `json:"provisioner"`
- }
- // Clone creates a copy of ClusterInfo and returns it
- func (ci *ClusterInfo) Clone() *ClusterInfo {
- if ci == nil {
- return nil
- }
- return &ClusterInfo{
- ID: ci.ID,
- Name: ci.Name,
- Profile: ci.Profile,
- Provider: ci.Provider,
- Account: ci.Account,
- Project: ci.Project,
- Region: ci.Region,
- Provisioner: ci.Provisioner,
- }
- }
- type ClusterMap interface {
- // GetClusterIDs returns a slice containing all of the cluster identifiers.
- GetClusterIDs() []string
- // AsMap returns the cluster map as a standard go map
- AsMap() map[string]*ClusterInfo
- // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
- // doesn't exist
- InfoFor(clusterID string) *ClusterInfo
- // NameFor returns the name of the cluster provided the clusterID.
- NameFor(clusterID string) string
- // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
- // assigned name. Otherwise, just the clusterID is returned.
- NameIDFor(clusterID string) string
- // SplitNameID splits the nameID back into a separate id and name field
- SplitNameID(nameID string) (id string, name string)
- // StopRefresh stops the automatic internal map refresh
- StopRefresh()
- }
- // ClusterInfoProvider is a contract which is capable of performing cluster info lookups.
- type ClusterInfoProvider interface {
- // GetClusterInfo returns a string map containing the local/remote connected cluster info
- GetClusterInfo() map[string]string
- }
- // ClusterMap keeps records of all known cost-model clusters.
- type PrometheusClusterMap struct {
- lock *sync.RWMutex
- client prometheus.Client
- clusters map[string]*ClusterInfo
- clusterInfo ClusterInfoProvider
- stop chan struct{}
- }
- // NewClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
- func NewClusterMap(client prometheus.Client, cip ClusterInfoProvider, refresh time.Duration) ClusterMap {
- stop := make(chan struct{})
- cm := &PrometheusClusterMap{
- lock: new(sync.RWMutex),
- client: client,
- clusters: make(map[string]*ClusterInfo),
- clusterInfo: cip,
- stop: stop,
- }
- // Run an updater to ensure cluster data stays relevant over time
- go func() {
- // Immediately Attempt to refresh the clusters
- cm.refreshClusters()
- // Tick on interval and refresh clusters
- ticker := time.NewTicker(refresh)
- for {
- select {
- case <-ticker.C:
- cm.refreshClusters()
- case <-cm.stop:
- log.Infof("ClusterMap refresh stopped.")
- return
- }
- }
- }()
- return cm
- }
- // clusterInfoQuery returns the query string to load cluster info
- func clusterInfoQuery(offset string) string {
- return fmt.Sprintf("kubecost_cluster_info%s", offset)
- }
- // loadClusters loads all the cluster info to map
- func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error) {
- var offset string = ""
- if prom.IsThanos(pcm.client) {
- offset = thanos.QueryOffset()
- }
- // Execute Query
- tryQuery := func() (interface{}, error) {
- ctx := prom.NewNamedContext(pcm.client, prom.ClusterMapContextName)
- resCh := ctx.QueryAtTime(clusterInfoQuery(offset), time.Now().Add(-promQueryOffset))
- r, e := resCh.Await()
- return r, e
- }
- // Retry on failure
- result, err := retry.Retry(context.Background(), tryQuery, uint(LoadRetries), LoadRetryDelay)
- qr, ok := result.([]*prom.QueryResult)
- if !ok || err != nil {
- return nil, err
- }
- clusters := make(map[string]*ClusterInfo)
- // Load the query results. Critical fields are id and name.
- for _, result := range qr {
- id, err := result.GetString("id")
- if err != nil {
- log.Warnf("Failed to load 'id' field for ClusterInfo")
- continue
- }
- name, err := result.GetString("name")
- if err != nil {
- log.Warnf("Failed to load 'name' field for ClusterInfo")
- continue
- }
- profile, err := result.GetString("clusterprofile")
- if err != nil {
- profile = ""
- }
- provider, err := result.GetString("provider")
- if err != nil {
- provider = ""
- }
- account, err := result.GetString("account")
- if err != nil {
- account = ""
- }
- project, err := result.GetString("project")
- if err != nil {
- project = ""
- }
- region, err := result.GetString("region")
- if err != nil {
- region = ""
- }
- provisioner, err := result.GetString("provisioner")
- if err != nil {
- provisioner = ""
- }
- clusters[id] = &ClusterInfo{
- ID: id,
- Name: name,
- Profile: profile,
- Provider: provider,
- Account: account,
- Project: project,
- Region: region,
- Provisioner: provisioner,
- }
- }
- // populate the local cluster if it doesn't exist
- localInfo, err := pcm.getLocalClusterInfo()
- if err != nil {
- return clusters, nil
- }
- // Check to see if the local cluster's id is part of our loaded clusters, and include if not
- if _, ok := clusters[localInfo.ID]; !ok {
- clusters[localInfo.ID] = localInfo
- }
- return clusters, nil
- }
- // getLocalClusterInfo returns the local cluster info in the event there does not exist a metric available.
- func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*ClusterInfo, error) {
- info := pcm.clusterInfo.GetClusterInfo()
- var id string
- var name string
- if i, ok := info["id"]; ok {
- id = i
- } else {
- return nil, fmt.Errorf("Local Cluster Info Missing ID")
- }
- if n, ok := info["name"]; ok {
- name = n
- } else {
- return nil, fmt.Errorf("Local Cluster Info Missing Name")
- }
- var clusterProfile string
- var provider string
- var account string
- var project string
- var region string
- var provisioner string
- if cp, ok := info["clusterProfile"]; ok {
- clusterProfile = cp
- }
- if pvdr, ok := info["provider"]; ok {
- provider = pvdr
- }
- if acct, ok := info["account"]; ok {
- account = acct
- }
- if proj, ok := info["project"]; ok {
- project = proj
- }
- if reg, ok := info["region"]; ok {
- region = reg
- }
- if pvsr, ok := info["provisioner"]; ok {
- provisioner = pvsr
- }
- return &ClusterInfo{
- ID: id,
- Name: name,
- Profile: clusterProfile,
- Provider: provider,
- Account: account,
- Project: project,
- Region: region,
- Provisioner: provisioner,
- }, nil
- }
- // refreshClusters loads the clusters and updates the internal map
- func (pcm *PrometheusClusterMap) refreshClusters() {
- updated, err := pcm.loadClusters()
- if err != nil {
- log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
- return
- }
- pcm.lock.Lock()
- pcm.clusters = updated
- pcm.lock.Unlock()
- }
- // GetClusterIDs returns a slice containing all of the cluster identifiers.
- func (pcm *PrometheusClusterMap) GetClusterIDs() []string {
- pcm.lock.RLock()
- defer pcm.lock.RUnlock()
- var clusterIDs []string
- for id := range pcm.clusters {
- clusterIDs = append(clusterIDs, id)
- }
- return clusterIDs
- }
- // AsMap returns the cluster map as a standard go map
- func (pcm *PrometheusClusterMap) AsMap() map[string]*ClusterInfo {
- pcm.lock.RLock()
- defer pcm.lock.RUnlock()
- m := make(map[string]*ClusterInfo)
- for k, v := range pcm.clusters {
- m[k] = v.Clone()
- }
- return m
- }
- // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
- // doesn't exist
- func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *ClusterInfo {
- pcm.lock.RLock()
- defer pcm.lock.RUnlock()
- if info, ok := pcm.clusters[clusterID]; ok {
- return info.Clone()
- }
- return nil
- }
- // NameFor returns the name of the cluster provided the clusterID.
- func (pcm *PrometheusClusterMap) NameFor(clusterID string) string {
- pcm.lock.RLock()
- defer pcm.lock.RUnlock()
- if info, ok := pcm.clusters[clusterID]; ok {
- return info.Name
- }
- return ""
- }
- // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
- // assigned name. Otherwise, just the clusterID is returned.
- func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
- pcm.lock.RLock()
- defer pcm.lock.RUnlock()
- if info, ok := pcm.clusters[clusterID]; ok {
- if info.Name == "" {
- return clusterID
- }
- return fmt.Sprintf("%s/%s", info.Name, clusterID)
- }
- return clusterID
- }
- func (pcm *PrometheusClusterMap) SplitNameID(nameID string) (id string, name string) {
- if !strings.Contains(nameID, "/") {
- id = nameID
- name = ""
- return
- }
- split := strings.Split(nameID, "/")
- name = split[0]
- id = split[1]
- return
- }
- // StopRefresh stops the automatic internal map refresh
- func (pcm *PrometheusClusterMap) StopRefresh() {
- if pcm.stop != nil {
- close(pcm.stop)
- pcm.stop = nil
- }
- }
|