|
|
@@ -7,13 +7,13 @@ import (
|
|
|
"fmt"
|
|
|
"io/ioutil"
|
|
|
"net/http"
|
|
|
- "os"
|
|
|
"reflect"
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
+ "github.com/kubecost/cost-model/pkg/config"
|
|
|
"github.com/kubecost/cost-model/pkg/services"
|
|
|
"github.com/kubecost/cost-model/pkg/util/httputil"
|
|
|
"github.com/kubecost/cost-model/pkg/util/timeutil"
|
|
|
@@ -23,8 +23,6 @@ import (
|
|
|
v1 "k8s.io/api/core/v1"
|
|
|
"k8s.io/klog"
|
|
|
|
|
|
- k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
|
|
-
|
|
|
"github.com/julienschmidt/httprouter"
|
|
|
|
|
|
sentry "github.com/getsentry/sentry-go"
|
|
|
@@ -41,6 +39,7 @@ import (
|
|
|
"github.com/kubecost/cost-model/pkg/util/json"
|
|
|
prometheus "github.com/prometheus/client_golang/api"
|
|
|
prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
|
|
|
+ appsv1 "k8s.io/api/apps/v1"
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
|
|
|
"github.com/patrickmn/go-cache"
|
|
|
@@ -72,20 +71,23 @@ var (
|
|
|
// Accesses defines a singleton application instance, providing access to
|
|
|
// Prometheus, Kubernetes, the cloud provider, and caches.
|
|
|
type Accesses struct {
|
|
|
- Router *httprouter.Router
|
|
|
- PrometheusClient prometheus.Client
|
|
|
- ThanosClient prometheus.Client
|
|
|
- KubeClientSet kubernetes.Interface
|
|
|
- ClusterMap clusters.ClusterMap
|
|
|
- CloudProvider cloud.Provider
|
|
|
- Model *CostModel
|
|
|
- MetricsEmitter *CostModelMetricsEmitter
|
|
|
- OutOfClusterCache *cache.Cache
|
|
|
- AggregateCache *cache.Cache
|
|
|
- CostDataCache *cache.Cache
|
|
|
- ClusterCostsCache *cache.Cache
|
|
|
- CacheExpiration map[time.Duration]time.Duration
|
|
|
- AggAPI Aggregator
|
|
|
+ Router *httprouter.Router
|
|
|
+ PrometheusClient prometheus.Client
|
|
|
+ ThanosClient prometheus.Client
|
|
|
+ KubeClientSet kubernetes.Interface
|
|
|
+ ClusterCache clustercache.ClusterCache
|
|
|
+ ClusterMap clusters.ClusterMap
|
|
|
+ CloudProvider cloud.Provider
|
|
|
+ ConfigFileManager *config.ConfigFileManager
|
|
|
+ ClusterInfoProvider clusters.ClusterInfoProvider
|
|
|
+ Model *CostModel
|
|
|
+ MetricsEmitter *CostModelMetricsEmitter
|
|
|
+ OutOfClusterCache *cache.Cache
|
|
|
+ AggregateCache *cache.Cache
|
|
|
+ CostDataCache *cache.Cache
|
|
|
+ ClusterCostsCache *cache.Cache
|
|
|
+ CacheExpiration map[time.Duration]time.Duration
|
|
|
+ AggAPI Aggregator
|
|
|
// SettingsCache stores current state of app settings
|
|
|
SettingsCache *cache.Cache
|
|
|
// settingsSubscribers tracks channels through which changes to different
|
|
|
@@ -352,6 +354,14 @@ func WrapDataWithMessageAndWarning(data interface{}, err error, message, warning
|
|
|
return resp
|
|
|
}
|
|
|
|
|
|
+// wrapAsObjectItems wraps a slice of items into an object containing a single items list
|
|
|
+// allows our k8s proxy methods to emulate a List() request to k8s API
|
|
|
+func wrapAsObjectItems(items interface{}) map[string]interface{} {
|
|
|
+ return map[string]interface{}{
|
|
|
+ "items": items,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
|
|
|
func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
@@ -694,7 +704,7 @@ func (a *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httpro
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
|
|
- data := GetClusterInfo(a.KubeClientSet, a.CloudProvider)
|
|
|
+ data := a.ClusterInfoProvider.GetClusterInfo()
|
|
|
|
|
|
w.Write(WrapData(data, nil))
|
|
|
}
|
|
|
@@ -928,12 +938,9 @@ func (a *Accesses) GetAllPersistentVolumes(w http.ResponseWriter, r *http.Reques
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
|
|
- pvList, err := a.KubeClientSet.CoreV1().PersistentVolumes().List(r.Context(), metav1.ListOptions{})
|
|
|
- if err != nil {
|
|
|
- fmt.Fprintf(w, "Error getting persistent volume %v\n", err)
|
|
|
- }
|
|
|
+ pvList := a.ClusterCache.GetAllPersistentVolumes()
|
|
|
|
|
|
- body, err := json.Marshal(pvList)
|
|
|
+ body, err := json.Marshal(wrapAsObjectItems(pvList))
|
|
|
if err != nil {
|
|
|
fmt.Fprintf(w, "Error decoding persistent volumes: "+err.Error())
|
|
|
} else {
|
|
|
@@ -945,12 +952,28 @@ func (a *Accesses) GetAllPersistentVolumes(w http.ResponseWriter, r *http.Reques
|
|
|
func (a *Accesses) GetAllDeployments(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
- namespace := r.URL.Query().Get("namespace")
|
|
|
- deploymentsList, err := a.KubeClientSet.AppsV1().Deployments(namespace).List(r.Context(), metav1.ListOptions{})
|
|
|
- if err != nil {
|
|
|
- fmt.Fprintf(w, "Error getting deployments %v\n", err)
|
|
|
+
|
|
|
+ qp := httputil.NewQueryParams(r.URL.Query())
|
|
|
+
|
|
|
+ namespace := qp.Get("namespace", "")
|
|
|
+
|
|
|
+ deploymentsList := a.ClusterCache.GetAllDeployments()
|
|
|
+
|
|
|
+ // filter for provided namespace
|
|
|
+ var deployments []*appsv1.Deployment
|
|
|
+ if namespace == "" {
|
|
|
+ deployments = deploymentsList
|
|
|
+ } else {
|
|
|
+ deployments = []*appsv1.Deployment{}
|
|
|
+
|
|
|
+ for _, d := range deploymentsList {
|
|
|
+ if d.Namespace == namespace {
|
|
|
+ deployments = append(deployments, d)
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- body, err := json.Marshal(deploymentsList)
|
|
|
+
|
|
|
+ body, err := json.Marshal(wrapAsObjectItems(deployments))
|
|
|
if err != nil {
|
|
|
fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
|
|
|
} else {
|
|
|
@@ -962,11 +985,9 @@ func (a *Accesses) GetAllStorageClasses(w http.ResponseWriter, r *http.Request,
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
|
|
- scList, err := a.KubeClientSet.StorageV1().StorageClasses().List(r.Context(), metav1.ListOptions{})
|
|
|
- if err != nil {
|
|
|
- fmt.Fprintf(w, "Error getting storageclasses: "+err.Error())
|
|
|
- }
|
|
|
- body, err := json.Marshal(scList)
|
|
|
+ scList := a.ClusterCache.GetAllStorageClasses()
|
|
|
+
|
|
|
+ body, err := json.Marshal(wrapAsObjectItems(scList))
|
|
|
if err != nil {
|
|
|
fmt.Fprintf(w, "Error decoding storageclasses: "+err.Error())
|
|
|
} else {
|
|
|
@@ -977,12 +998,28 @@ func (a *Accesses) GetAllStorageClasses(w http.ResponseWriter, r *http.Request,
|
|
|
func (a *Accesses) GetAllStatefulSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
- namespace := r.URL.Query().Get("namespace")
|
|
|
- deploymentsList, err := a.KubeClientSet.AppsV1().StatefulSets(namespace).List(r.Context(), metav1.ListOptions{})
|
|
|
- if err != nil {
|
|
|
- fmt.Fprintf(w, "Error getting deployments %v\n", err)
|
|
|
+
|
|
|
+ qp := httputil.NewQueryParams(r.URL.Query())
|
|
|
+
|
|
|
+ namespace := qp.Get("namespace", "")
|
|
|
+
|
|
|
+ statefulSetsList := a.ClusterCache.GetAllStatefulSets()
|
|
|
+
|
|
|
+ // filter for provided namespace
|
|
|
+ var statefulSets []*appsv1.StatefulSet
|
|
|
+ if namespace == "" {
|
|
|
+ statefulSets = statefulSetsList
|
|
|
+ } else {
|
|
|
+ statefulSets = []*appsv1.StatefulSet{}
|
|
|
+
|
|
|
+ for _, ss := range statefulSetsList {
|
|
|
+ if ss.Namespace == namespace {
|
|
|
+ statefulSets = append(statefulSets, ss)
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- body, err := json.Marshal(deploymentsList)
|
|
|
+
|
|
|
+ body, err := json.Marshal(wrapAsObjectItems(statefulSets))
|
|
|
if err != nil {
|
|
|
fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
|
|
|
} else {
|
|
|
@@ -994,12 +1031,9 @@ func (a *Accesses) GetAllNodes(w http.ResponseWriter, r *http.Request, ps httpro
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
|
|
- nodeList, err := a.KubeClientSet.CoreV1().Nodes().List(r.Context(), metav1.ListOptions{})
|
|
|
- if err != nil {
|
|
|
- fmt.Fprintf(w, "Error getting node %v\n", err)
|
|
|
- }
|
|
|
+ nodeList := a.ClusterCache.GetAllNodes()
|
|
|
|
|
|
- body, err := json.Marshal(nodeList)
|
|
|
+ body, err := json.Marshal(wrapAsObjectItems(nodeList))
|
|
|
if err != nil {
|
|
|
fmt.Fprintf(w, "Error decoding nodes: "+err.Error())
|
|
|
} else {
|
|
|
@@ -1011,12 +1045,9 @@ func (a *Accesses) GetAllPods(w http.ResponseWriter, r *http.Request, ps httprou
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
|
|
- podlist, err := a.KubeClientSet.CoreV1().Pods("").List(r.Context(), metav1.ListOptions{})
|
|
|
- if err != nil {
|
|
|
- fmt.Fprintf(w, "Error getting pod %v\n", err)
|
|
|
- }
|
|
|
+ podlist := a.ClusterCache.GetAllPods()
|
|
|
|
|
|
- body, err := json.Marshal(podlist)
|
|
|
+ body, err := json.Marshal(wrapAsObjectItems(podlist))
|
|
|
if err != nil {
|
|
|
fmt.Fprintf(w, "Error decoding pods: "+err.Error())
|
|
|
} else {
|
|
|
@@ -1028,11 +1059,9 @@ func (a *Accesses) GetAllNamespaces(w http.ResponseWriter, r *http.Request, ps h
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
|
|
- namespaces, err := a.KubeClientSet.CoreV1().Namespaces().List(r.Context(), metav1.ListOptions{})
|
|
|
- if err != nil {
|
|
|
- fmt.Fprintf(w, "Error getting namespaces %v\n", err)
|
|
|
- }
|
|
|
- body, err := json.Marshal(namespaces)
|
|
|
+ namespaces := a.ClusterCache.GetAllNamespaces()
|
|
|
+
|
|
|
+ body, err := json.Marshal(wrapAsObjectItems(namespaces))
|
|
|
if err != nil {
|
|
|
fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
|
|
|
} else {
|
|
|
@@ -1043,11 +1072,10 @@ func (a *Accesses) GetAllNamespaces(w http.ResponseWriter, r *http.Request, ps h
|
|
|
func (a *Accesses) GetAllDaemonSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
- daemonSets, err := a.KubeClientSet.AppsV1().DaemonSets("").List(r.Context(), metav1.ListOptions{})
|
|
|
- if err != nil {
|
|
|
- fmt.Fprintf(w, "Error getting daemon sets %v\n", err)
|
|
|
- }
|
|
|
- body, err := json.Marshal(daemonSets)
|
|
|
+
|
|
|
+ daemonSets := a.ClusterCache.GetAllDaemonSets()
|
|
|
+
|
|
|
+ body, err := json.Marshal(wrapAsObjectItems(daemonSets))
|
|
|
if err != nil {
|
|
|
fmt.Fprintf(w, "Error decoding daemon set: "+err.Error())
|
|
|
} else {
|
|
|
@@ -1062,24 +1090,21 @@ func (a *Accesses) GetPod(w http.ResponseWriter, r *http.Request, ps httprouter.
|
|
|
podName := ps.ByName("name")
|
|
|
podNamespace := ps.ByName("namespace")
|
|
|
|
|
|
- // Examples for error handling:
|
|
|
- // - Use helper functions like e.g. errors.IsNotFound()
|
|
|
- // - And/or cast to StatusError and use its properties like e.g. ErrStatus.Message
|
|
|
- pod, err := a.KubeClientSet.CoreV1().Pods(podNamespace).Get(r.Context(), podName, metav1.GetOptions{})
|
|
|
- if k8serrors.IsNotFound(err) {
|
|
|
- fmt.Fprintf(w, "Pod not found\n")
|
|
|
- } else if statusError, isStatus := err.(*k8serrors.StatusError); isStatus {
|
|
|
- fmt.Fprintf(w, "Error getting pod %v\n", statusError.ErrStatus.Message)
|
|
|
- } else if err != nil {
|
|
|
- fmt.Fprintf(w, "Error getting pod: "+err.Error())
|
|
|
- } else {
|
|
|
- body, err := json.Marshal(pod)
|
|
|
- if err != nil {
|
|
|
- fmt.Fprintf(w, "Error decoding pod: "+err.Error())
|
|
|
- } else {
|
|
|
- w.Write(body)
|
|
|
+ // TODO: ClusterCache API could probably afford to have some better filtering
|
|
|
+ allPods := a.ClusterCache.GetAllPods()
|
|
|
+ for _, pod := range allPods {
|
|
|
+ if pod.Namespace == podNamespace && pod.Name == podName {
|
|
|
+ body, err := json.Marshal(pod)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Fprintf(w, "Error decoding pod: "+err.Error())
|
|
|
+ } else {
|
|
|
+ w.Write(body)
|
|
|
+ }
|
|
|
+ return
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ fmt.Fprintf(w, "Pod not found\n")
|
|
|
}
|
|
|
|
|
|
func (a *Accesses) PrometheusRecordingRules(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
|
|
|
@@ -1105,9 +1130,9 @@ func (a *Accesses) PrometheusConfig(w http.ResponseWriter, r *http.Request, _ ht
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
|
|
- pConfig := make(map[string]string)
|
|
|
-
|
|
|
- pConfig["address"] = os.Getenv("PROMETHEUS_SERVER_ENDPOINT")
|
|
|
+ pConfig := map[string]string{
|
|
|
+ "address": env.GetPrometheusServerEndpoint(),
|
|
|
+ }
|
|
|
|
|
|
body, err := json.Marshal(pConfig)
|
|
|
if err != nil {
|
|
|
@@ -1140,13 +1165,10 @@ func (a *Accesses) GetOrphanedPods(w http.ResponseWriter, r *http.Request, ps ht
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
|
|
- podlist, err := a.KubeClientSet.CoreV1().Pods("").List(r.Context(), metav1.ListOptions{})
|
|
|
- if err != nil {
|
|
|
- fmt.Fprintf(w, "Error getting pod %v\n", err)
|
|
|
- }
|
|
|
+ podlist := a.ClusterCache.GetAllPods()
|
|
|
|
|
|
- var lonePods []v1.Pod
|
|
|
- for _, pod := range podlist.Items {
|
|
|
+ var lonePods []*v1.Pod
|
|
|
+ for _, pod := range podlist {
|
|
|
if len(pod.OwnerReferences) == 0 {
|
|
|
lonePods = append(lonePods, pod)
|
|
|
}
|
|
|
@@ -1164,10 +1186,11 @@ func (a *Accesses) GetInstallNamespace(w http.ResponseWriter, r *http.Request, _
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
|
|
- ns := os.Getenv("KUBECOST_NAMESPACE")
|
|
|
+ ns := env.GetKubecostNamespace()
|
|
|
w.Write([]byte(ns))
|
|
|
}
|
|
|
|
|
|
+// logsFor pulls the logs for a specific pod, namespace, and container
|
|
|
func logsFor(c kubernetes.Interface, namespace string, pod string, container string, dur time.Duration, ctx context.Context) (string, error) {
|
|
|
since := time.Now().UTC().Add(-dur)
|
|
|
|
|
|
@@ -1196,18 +1219,13 @@ func (a *Accesses) GetPodLogs(w http.ResponseWriter, r *http.Request, ps httprou
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
|
|
- q := r.URL.Query()
|
|
|
- ns := q.Get("namespace")
|
|
|
- if ns == "" {
|
|
|
- ns = os.Getenv("KUBECOST_NAMESPACE")
|
|
|
- }
|
|
|
- pod := q.Get("pod")
|
|
|
- selector := q.Get("selector")
|
|
|
- container := q.Get("container")
|
|
|
- since := q.Get("since")
|
|
|
- if since == "" {
|
|
|
- since = "24h"
|
|
|
- }
|
|
|
+ qp := httputil.NewQueryParams(r.URL.Query())
|
|
|
+
|
|
|
+ ns := qp.Get("namespace", env.GetKubecostNamespace())
|
|
|
+ pod := qp.Get("pod", "")
|
|
|
+ selector := qp.Get("selector", "")
|
|
|
+ container := qp.Get("container", "")
|
|
|
+ since := qp.Get("since", "24h")
|
|
|
|
|
|
sinceDuration, err := time.ParseDuration(since)
|
|
|
if err != nil {
|
|
|
@@ -1284,8 +1302,6 @@ func (a *Accesses) AddServiceKey(w http.ResponseWriter, r *http.Request, ps http
|
|
|
|
|
|
r.ParseForm()
|
|
|
|
|
|
- //p.CloudProvider.AddServiceKey(r.PostForm)
|
|
|
-
|
|
|
key := r.PostForm.Get("key")
|
|
|
k := []byte(key)
|
|
|
err := ioutil.WriteFile("/var/configs/key.json", k, 0644)
|
|
|
@@ -1300,7 +1316,7 @@ func (a *Accesses) GetHelmValues(w http.ResponseWriter, r *http.Request, ps http
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
|
|
- encodedValues := os.Getenv("HELM_VALUES")
|
|
|
+ encodedValues := env.Get("HELM_VALUES", "")
|
|
|
if encodedValues == "" {
|
|
|
fmt.Fprintf(w, "Values reporting disabled")
|
|
|
return
|
|
|
@@ -1319,14 +1335,15 @@ func (a *Accesses) Status(w http.ResponseWriter, r *http.Request, _ httprouter.P
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
|
|
+ promServer := env.GetPrometheusServerEndpoint()
|
|
|
+
|
|
|
api := prometheusAPI.NewAPI(a.PrometheusClient)
|
|
|
result, err := api.Config(r.Context())
|
|
|
if err != nil {
|
|
|
-
|
|
|
- fmt.Fprintf(w, "Using Prometheus at "+os.Getenv("PROMETHEUS_SERVER_ENDPOINT")+". Error: "+err.Error())
|
|
|
+ fmt.Fprintf(w, "Using Prometheus at "+promServer+". Error: "+err.Error())
|
|
|
} else {
|
|
|
|
|
|
- fmt.Fprintf(w, "Using Prometheus at "+os.Getenv("PROMETHEUS_SERVER_ENDPOINT")+". PrometheusConfig: "+result.YAML)
|
|
|
+ fmt.Fprintf(w, "Using Prometheus at "+promServer+". PrometheusConfig: "+result.YAML)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1441,12 +1458,24 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
|
|
|
panic(err.Error())
|
|
|
}
|
|
|
|
|
|
+ // Create ConfigFileManager for synchronization of shared configuration
|
|
|
+ confManager := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
|
|
|
+ BucketStoreConfig: env.GetKubecostConfigBucket(),
|
|
|
+ LocalConfigPath: "/",
|
|
|
+ })
|
|
|
+
|
|
|
// Create Kubernetes Cluster Cache + Watchers
|
|
|
- k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
|
|
|
+ var k8sCache clustercache.ClusterCache
|
|
|
+ if env.IsClusterCacheFileEnabled() {
|
|
|
+ importLocation := confManager.ConfigFileAt("/var/configs/cluster-cache.json")
|
|
|
+ k8sCache = clustercache.NewClusterImporter(importLocation)
|
|
|
+ } else {
|
|
|
+ k8sCache = clustercache.NewKubernetesClusterCache(kubeClientset)
|
|
|
+ }
|
|
|
k8sCache.Run()
|
|
|
|
|
|
cloudProviderKey := env.GetCloudProviderAPIKey()
|
|
|
- cloudProvider, err := cloud.NewProvider(k8sCache, cloudProviderKey)
|
|
|
+ cloudProvider, err := cloud.NewProvider(k8sCache, cloudProviderKey, confManager)
|
|
|
if err != nil {
|
|
|
panic(err.Error())
|
|
|
}
|
|
|
@@ -1506,13 +1535,21 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // ClusterInfo Provider to provide the cluster map with local and remote cluster data
|
|
|
+ var clusterInfoProvider clusters.ClusterInfoProvider
|
|
|
+ if env.IsClusterInfoFileEnabled() {
|
|
|
+ clusterInfoFile := confManager.ConfigFileAt("/var/configs/cluster-info.json")
|
|
|
+ clusterInfoProvider = NewConfiguredClusterInfoProvider(clusterInfoFile)
|
|
|
+ } else {
|
|
|
+ clusterInfoProvider = NewLocalClusterInfoProvider(kubeClientset, cloudProvider)
|
|
|
+ }
|
|
|
+
|
|
|
// Initialize ClusterMap for maintaining ClusterInfo by ClusterID
|
|
|
var clusterMap clusters.ClusterMap
|
|
|
- localCIProvider := NewLocalClusterInfoProvider(kubeClientset, cloudProvider)
|
|
|
if thanosClient != nil {
|
|
|
- clusterMap = clusters.NewClusterMap(thanosClient, localCIProvider, 10*time.Minute)
|
|
|
+ clusterMap = clusters.NewClusterMap(thanosClient, clusterInfoProvider, 10*time.Minute)
|
|
|
} else {
|
|
|
- clusterMap = clusters.NewClusterMap(promCli, localCIProvider, 5*time.Minute)
|
|
|
+ clusterMap = clusters.NewClusterMap(promCli, clusterInfoProvider, 5*time.Minute)
|
|
|
}
|
|
|
|
|
|
// cache responses from model and aggregation for a default of 10 minutes;
|
|
|
@@ -1541,24 +1578,27 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
|
|
|
pc = promCli
|
|
|
}
|
|
|
costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
|
|
|
- metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, costModel)
|
|
|
+ metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, clusterInfoProvider, costModel)
|
|
|
|
|
|
a := &Accesses{
|
|
|
- Router: httprouter.New(),
|
|
|
- PrometheusClient: promCli,
|
|
|
- ThanosClient: thanosClient,
|
|
|
- KubeClientSet: kubeClientset,
|
|
|
- ClusterMap: clusterMap,
|
|
|
- CloudProvider: cloudProvider,
|
|
|
- Model: costModel,
|
|
|
- MetricsEmitter: metricsEmitter,
|
|
|
- AggregateCache: aggregateCache,
|
|
|
- CostDataCache: costDataCache,
|
|
|
- ClusterCostsCache: clusterCostsCache,
|
|
|
- OutOfClusterCache: outOfClusterCache,
|
|
|
- SettingsCache: settingsCache,
|
|
|
- CacheExpiration: cacheExpiration,
|
|
|
- httpServices: services.NewCostModelServices(),
|
|
|
+ Router: httprouter.New(),
|
|
|
+ PrometheusClient: promCli,
|
|
|
+ ThanosClient: thanosClient,
|
|
|
+ KubeClientSet: kubeClientset,
|
|
|
+ ClusterCache: k8sCache,
|
|
|
+ ClusterMap: clusterMap,
|
|
|
+ CloudProvider: cloudProvider,
|
|
|
+ ConfigFileManager: confManager,
|
|
|
+ ClusterInfoProvider: clusterInfoProvider,
|
|
|
+ Model: costModel,
|
|
|
+ MetricsEmitter: metricsEmitter,
|
|
|
+ AggregateCache: aggregateCache,
|
|
|
+ CostDataCache: costDataCache,
|
|
|
+ ClusterCostsCache: clusterCostsCache,
|
|
|
+ OutOfClusterCache: outOfClusterCache,
|
|
|
+ SettingsCache: settingsCache,
|
|
|
+ CacheExpiration: cacheExpiration,
|
|
|
+ httpServices: services.NewCostModelServices(),
|
|
|
}
|
|
|
// Use the Accesses instance, itself, as the CostModelAggregator. This is
|
|
|
// confusing and unconventional, but necessary so that we can swap it
|