| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- package clusters
- import (
- "fmt"
- "math/rand"
- "strings"
- "sync"
- "time"
- "github.com/kubecost/cost-model/pkg/log"
- "github.com/kubecost/cost-model/pkg/prom"
- "github.com/kubecost/cost-model/pkg/thanos"
- prometheus "github.com/prometheus/client_golang/api"
- )
- const (
- LoadRetries int = 6
- LoadRetryDelay time.Duration = 10 * time.Second
- )
- type ClusterInfo struct {
- ID string `json:"id"`
- Name string `json:"name"`
- Profile string `json:"profile"`
- Provider string `json:"provider"`
- Provisioner string `json:"provisioner"`
- }
- // Clone creates a copy of ClusterInfo and returns it
- func (ci *ClusterInfo) Clone() *ClusterInfo {
- if ci == nil {
- return nil
- }
- return &ClusterInfo{
- ID: ci.ID,
- Name: ci.Name,
- Profile: ci.Profile,
- Provider: ci.Provider,
- Provisioner: ci.Provisioner,
- }
- }
- type ClusterMap interface {
- // GetClusterIDs returns a slice containing all of the cluster identifiers.
- GetClusterIDs() []string
- // AsMap returns the cluster map as a standard go map
- AsMap() map[string]*ClusterInfo
- // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
- // doesn't exist
- InfoFor(clusterID string) *ClusterInfo
- // NameFor returns the name of the cluster provided the clusterID.
- NameFor(clusterID string) string
- // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
- // assigned name. Otherwise, just the clusterID is returned.
- NameIDFor(clusterID string) string
- // SplitNameID splits the nameID back into a separate id and name field
- SplitNameID(nameID string) (id string, name string)
- // StopRefresh stops the automatic internal map refresh
- StopRefresh()
- }
- // ClusterMap keeps records of all known cost-model clusters.
- type PrometheusClusterMap struct {
- lock *sync.RWMutex
- client prometheus.Client
- clusters map[string]*ClusterInfo
- stop chan struct{}
- }
- // NewClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
- func NewClusterMap(client prometheus.Client, refresh time.Duration) ClusterMap {
- stop := make(chan struct{})
- cm := &PrometheusClusterMap{
- lock: new(sync.RWMutex),
- client: client,
- clusters: make(map[string]*ClusterInfo),
- stop: stop,
- }
- // Run an updater to ensure cluster data stays relevant over time
- go func() {
- // Immediately Attempt to refresh the clusters
- cm.refreshClusters()
- // Tick on interval and refresh clusters
- ticker := time.NewTicker(refresh)
- for {
- select {
- case <-ticker.C:
- cm.refreshClusters()
- case <-cm.stop:
- log.Infof("ClusterMap refresh stopped.")
- return
- }
- }
- }()
- return cm
- }
- // clusterInfoQuery returns the query string to load cluster info
- func clusterInfoQuery(offset string) string {
- return fmt.Sprintf("kubecost_cluster_info%s", offset)
- }
- // loadClusters loads all the cluster info to map
- func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error) {
- var offset string = ""
- if prom.IsThanos(pcm.client) {
- offset = thanos.QueryOffset()
- }
- // Execute Query
- tryQuery := func() ([]*prom.QueryResult, prometheus.Warnings, error) {
- ctx := prom.NewContext(pcm.client)
- return ctx.QuerySync(clusterInfoQuery(offset))
- }
- var qr []*prom.QueryResult
- var err error
- // Retry on failure
- delay := LoadRetryDelay
- for r := LoadRetries; r > 0; r-- {
- qr, _, err = tryQuery()
- // non-error breaks out of loop
- if err == nil {
- break
- }
- // wait the delay
- time.Sleep(delay)
- // add some random backoff
- jitter := time.Duration(rand.Int63n(int64(delay)))
- delay = delay + jitter/2
- }
- if err != nil {
- return nil, err
- }
- clusters := make(map[string]*ClusterInfo)
- // Load the query results. Critical fields are id and name.
- for _, result := range qr {
- id, err := result.GetString("id")
- if err != nil {
- log.Warningf("Failed to load 'id' field for ClusterInfo")
- continue
- }
- name, err := result.GetString("name")
- if err != nil {
- log.Warningf("Failed to load 'name' field for ClusterInfo")
- continue
- }
- profile, err := result.GetString("clusterprofile")
- if err != nil {
- profile = ""
- }
- provider, err := result.GetString("provider")
- if err != nil {
- provider = ""
- }
- provisioner, err := result.GetString("provisioner")
- if err != nil {
- provisioner = ""
- }
- clusters[id] = &ClusterInfo{
- ID: id,
- Name: name,
- Profile: profile,
- Provider: provider,
- Provisioner: provisioner,
- }
- }
- return clusters, nil
- }
- // refreshClusters loads the clusters and updates the internal map
- func (pcm *PrometheusClusterMap) refreshClusters() {
- updated, err := pcm.loadClusters()
- if err != nil {
- log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
- return
- }
- pcm.lock.Lock()
- pcm.clusters = updated
- pcm.lock.Unlock()
- }
- // GetClusterIDs returns a slice containing all of the cluster identifiers.
- func (pcm *PrometheusClusterMap) GetClusterIDs() []string {
- pcm.lock.RLock()
- defer pcm.lock.RUnlock()
- var clusterIDs []string
- for id := range pcm.clusters {
- clusterIDs = append(clusterIDs, id)
- }
- return clusterIDs
- }
- // AsMap returns the cluster map as a standard go map
- func (pcm *PrometheusClusterMap) AsMap() map[string]*ClusterInfo {
- pcm.lock.RLock()
- defer pcm.lock.RUnlock()
- m := make(map[string]*ClusterInfo)
- for k, v := range pcm.clusters {
- m[k] = v.Clone()
- }
- return m
- }
- // InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
- // doesn't exist
- func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *ClusterInfo {
- pcm.lock.RLock()
- defer pcm.lock.RUnlock()
- if info, ok := pcm.clusters[clusterID]; ok {
- return info.Clone()
- }
- return nil
- }
- // NameFor returns the name of the cluster provided the clusterID.
- func (pcm *PrometheusClusterMap) NameFor(clusterID string) string {
- pcm.lock.RLock()
- defer pcm.lock.RUnlock()
- if info, ok := pcm.clusters[clusterID]; ok {
- return info.Name
- }
- return ""
- }
- // NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
- // assigned name. Otherwise, just the clusterID is returned.
- func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
- pcm.lock.RLock()
- defer pcm.lock.RUnlock()
- if info, ok := pcm.clusters[clusterID]; ok {
- if info.Name == "" {
- return clusterID
- }
- return fmt.Sprintf("%s/%s", info.Name, clusterID)
- }
- return clusterID
- }
- func (pcm *PrometheusClusterMap) SplitNameID(nameID string) (id string, name string) {
- if !strings.Contains(nameID, "/") {
- id = nameID
- name = ""
- return
- }
- split := strings.Split(nameID, "/")
- name = split[0]
- id = split[1]
- return
- }
- // StopRefresh stops the automatic internal map refresh
- func (pcm *PrometheusClusterMap) StopRefresh() {
- if pcm.stop != nil {
- close(pcm.stop)
- pcm.stop = nil
- }
- }
|