|
|
@@ -2,11 +2,9 @@ package costmodel
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
- "crypto/tls"
|
|
|
"encoding/json"
|
|
|
"flag"
|
|
|
"fmt"
|
|
|
- "net"
|
|
|
"net/http"
|
|
|
"reflect"
|
|
|
"strconv"
|
|
|
@@ -22,6 +20,7 @@ import (
|
|
|
costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
|
|
|
"github.com/kubecost/cost-model/pkg/clustercache"
|
|
|
cm "github.com/kubecost/cost-model/pkg/clustermanager"
|
|
|
+ "github.com/kubecost/cost-model/pkg/costmodel/clusters"
|
|
|
"github.com/kubecost/cost-model/pkg/env"
|
|
|
"github.com/kubecost/cost-model/pkg/errors"
|
|
|
"github.com/kubecost/cost-model/pkg/prom"
|
|
|
@@ -46,13 +45,7 @@ const (
|
|
|
|
|
|
var (
|
|
|
// gitCommit is set by the build system
|
|
|
- gitCommit string
|
|
|
- dbBasicAuthUsername string = env.GetDBBasicAuthUsername()
|
|
|
- dbBasicAuthPW string = env.GetDBBasicAuthUserPassword()
|
|
|
- dbBearerToken string = env.GetDBBearerToken()
|
|
|
- multiclusterDBBasicAuthUsername string = env.GetMultiClusterBasicAuthUsername()
|
|
|
- multiclusterDBBasicAuthPW string = env.GetMultiClusterBasicAuthPassword()
|
|
|
- multiClusterBearerToken string = env.GetMultiClusterBearerToken()
|
|
|
+ gitCommit string
|
|
|
)
|
|
|
|
|
|
var Router = httprouter.New()
|
|
|
@@ -63,6 +56,7 @@ type Accesses struct {
|
|
|
ThanosClient prometheusClient.Client
|
|
|
KubeClientSet kubernetes.Interface
|
|
|
ClusterManager *cm.ClusterManager
|
|
|
+ ClusterMap clusters.ClusterMap
|
|
|
Cloud costAnalyzerCloud.Provider
|
|
|
CPUPriceRecorder *prometheus.GaugeVec
|
|
|
RAMPriceRecorder *prometheus.GaugeVec
|
|
|
@@ -627,6 +621,15 @@ func (p *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httpro
|
|
|
w.Write(WrapData(data, nil))
|
|
|
}
|
|
|
|
|
|
+func (p *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
|
|
|
+ w.Header().Set("Content-Type", "application/json")
|
|
|
+ w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
+
|
|
|
+ data := p.ClusterMap.AsMap()
|
|
|
+
|
|
|
+ w.Write(WrapData(data, nil))
|
|
|
+}
|
|
|
+
|
|
|
func (p *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
@@ -637,7 +640,8 @@ func (p *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Reques
|
|
|
func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
- w.Write(WrapData(ValidatePrometheus(p.PrometheusClient, false)))
|
|
|
+
|
|
|
+ w.Write(WrapData(prom.Validate(p.PrometheusClient)))
|
|
|
}
|
|
|
|
|
|
// Creates a new ClusterManager instance using a boltdb storage. If that fails,
|
|
|
@@ -730,30 +734,18 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
|
|
|
queryConcurrency := env.GetMaxQueryConcurrency()
|
|
|
klog.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
|
|
|
|
|
|
- tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
|
|
|
- var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
|
|
|
- Proxy: http.ProxyFromEnvironment,
|
|
|
- DialContext: (&net.Dialer{
|
|
|
- Timeout: 120 * time.Second,
|
|
|
- KeepAlive: 120 * time.Second,
|
|
|
- }).DialContext,
|
|
|
- TLSHandshakeTimeout: 10 * time.Second,
|
|
|
- TLSClientConfig: tlsConfig,
|
|
|
- }
|
|
|
+ timeout := 120 * time.Second
|
|
|
+ keepAlive := 120 * time.Second
|
|
|
|
|
|
- pc := prometheusClient.Config{
|
|
|
- Address: address,
|
|
|
- RoundTripper: LongTimeoutRoundTripper,
|
|
|
- }
|
|
|
- promCli, _ := prom.NewRateLimitedClient(pc, queryConcurrency, dbBasicAuthUsername, dbBasicAuthPW, dbBearerToken, env.GetQueryLoggingFile())
|
|
|
-
|
|
|
- m, err := ValidatePrometheus(promCli, false)
|
|
|
+ promCli, _ := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
|
|
|
+ m, err := prom.Validate(promCli)
|
|
|
if err != nil || m.Running == false {
|
|
|
if err != nil {
|
|
|
klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
|
|
|
} else if m.Running == false {
|
|
|
klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prometheusTroubleshootingEp)
|
|
|
}
|
|
|
+
|
|
|
api := prometheusAPI.NewAPI(promCli)
|
|
|
_, err = api.Config(context.Background())
|
|
|
if err != nil {
|
|
|
@@ -808,6 +800,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
kubecostNamespace := env.GetKubecostNamespace()
|
|
|
// We need an initial invocation because the init of the cache has happened before we had access to the provider.
|
|
|
configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get("pricing-configs", metav1.GetOptions{})
|
|
|
@@ -934,30 +927,6 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
|
|
|
// cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
|
|
|
outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
|
|
|
|
|
|
- A = Accesses{
|
|
|
- PrometheusClient: promCli,
|
|
|
- KubeClientSet: kubeClientset,
|
|
|
- ClusterManager: clusterManager,
|
|
|
- Cloud: cloudProvider,
|
|
|
- CPUPriceRecorder: cpuGv,
|
|
|
- RAMPriceRecorder: ramGv,
|
|
|
- GPUPriceRecorder: gpuGv,
|
|
|
- NodeTotalPriceRecorder: totalGv,
|
|
|
- NodeSpotRecorder: spotGv,
|
|
|
- RAMAllocationRecorder: RAMAllocation,
|
|
|
- CPUAllocationRecorder: CPUAllocation,
|
|
|
- GPUAllocationRecorder: GPUAllocation,
|
|
|
- PVAllocationRecorder: PVAllocation,
|
|
|
- NetworkZoneEgressRecorder: NetworkZoneEgressRecorder,
|
|
|
- NetworkRegionEgressRecorder: NetworkRegionEgressRecorder,
|
|
|
- NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
|
|
|
- PersistentVolumePriceRecorder: pvGv,
|
|
|
- ClusterManagementCostRecorder: ClusterManagementCostRecorder,
|
|
|
- LBCostRecorder: LBCostRecorder,
|
|
|
- Model: NewCostModel(k8sCache),
|
|
|
- OutOfClusterCache: outOfClusterCache,
|
|
|
- }
|
|
|
-
|
|
|
remoteEnabled := env.IsRemoteEnabled()
|
|
|
if remoteEnabled {
|
|
|
info, err := cloudProvider.ClusterInfo()
|
|
|
@@ -972,35 +941,21 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
|
|
|
}
|
|
|
|
|
|
// Thanos Client
|
|
|
+ var thanosClient prometheusClient.Client
|
|
|
if thanos.IsEnabled() {
|
|
|
- thanosUrl := thanos.QueryURL()
|
|
|
-
|
|
|
- if thanosUrl != "" {
|
|
|
- var thanosRT http.RoundTripper = &http.Transport{
|
|
|
- Proxy: http.ProxyFromEnvironment,
|
|
|
- DialContext: (&net.Dialer{
|
|
|
- Timeout: 120 * time.Second,
|
|
|
- KeepAlive: 120 * time.Second,
|
|
|
- }).DialContext,
|
|
|
- TLSHandshakeTimeout: 10 * time.Second,
|
|
|
- TLSClientConfig: tlsConfig,
|
|
|
- }
|
|
|
-
|
|
|
- thanosConfig := prometheusClient.Config{
|
|
|
- Address: thanosUrl,
|
|
|
- RoundTripper: thanosRT,
|
|
|
- }
|
|
|
+ thanosAddress := thanos.QueryURL()
|
|
|
|
|
|
- thanosCli, _ := prom.NewRateLimitedClient(thanosConfig, queryConcurrency, multiclusterDBBasicAuthUsername, multiclusterDBBasicAuthPW, multiClusterBearerToken, env.GetQueryLoggingFile())
|
|
|
+ if thanosAddress != "" {
|
|
|
+ thanosCli, _ := thanos.NewThanosClient(thanosAddress, timeout, keepAlive, queryConcurrency, env.GetQueryLoggingFile())
|
|
|
|
|
|
- _, err = ValidatePrometheus(thanosCli, true)
|
|
|
+ _, err = prom.Validate(thanosCli)
|
|
|
if err != nil {
|
|
|
- klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosUrl, err.Error())
|
|
|
- A.ThanosClient = thanosCli
|
|
|
+ klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosAddress, err.Error())
|
|
|
+ thanosClient = thanosCli
|
|
|
} else {
|
|
|
- klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosUrl)
|
|
|
+ klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosAddress)
|
|
|
|
|
|
- A.ThanosClient = thanosCli
|
|
|
+ thanosClient = thanosCli
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
@@ -1008,6 +963,40 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // Initialize ClusterMap for maintaining ClusterInfo by ClusterID
|
|
|
+ var clusterMap clusters.ClusterMap
|
|
|
+ if thanosClient != nil {
|
|
|
+ clusterMap = clusters.NewClusterMap(thanosClient, 10*time.Minute)
|
|
|
+ } else {
|
|
|
+ clusterMap = clusters.NewClusterMap(promCli, 5*time.Minute)
|
|
|
+ }
|
|
|
+
|
|
|
+ A = Accesses{
|
|
|
+ PrometheusClient: promCli,
|
|
|
+ ThanosClient: thanosClient,
|
|
|
+ KubeClientSet: kubeClientset,
|
|
|
+ ClusterManager: clusterManager,
|
|
|
+ ClusterMap: clusterMap,
|
|
|
+ Cloud: cloudProvider,
|
|
|
+ CPUPriceRecorder: cpuGv,
|
|
|
+ RAMPriceRecorder: ramGv,
|
|
|
+ GPUPriceRecorder: gpuGv,
|
|
|
+ NodeTotalPriceRecorder: totalGv,
|
|
|
+ NodeSpotRecorder: spotGv,
|
|
|
+ RAMAllocationRecorder: RAMAllocation,
|
|
|
+ CPUAllocationRecorder: CPUAllocation,
|
|
|
+ GPUAllocationRecorder: GPUAllocation,
|
|
|
+ PVAllocationRecorder: PVAllocation,
|
|
|
+ NetworkZoneEgressRecorder: NetworkZoneEgressRecorder,
|
|
|
+ NetworkRegionEgressRecorder: NetworkRegionEgressRecorder,
|
|
|
+ NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
|
|
|
+ PersistentVolumePriceRecorder: pvGv,
|
|
|
+ ClusterManagementCostRecorder: ClusterManagementCostRecorder,
|
|
|
+ LBCostRecorder: LBCostRecorder,
|
|
|
+ Model: NewCostModel(k8sCache, clusterMap),
|
|
|
+ OutOfClusterCache: outOfClusterCache,
|
|
|
+ }
|
|
|
+
|
|
|
err = A.Cloud.DownloadPricingData()
|
|
|
if err != nil {
|
|
|
klog.V(1).Info("Failed to download pricing data: " + err.Error())
|
|
|
@@ -1028,8 +1017,11 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
|
|
|
Router.GET("/validatePrometheus", A.GetPrometheusMetadata)
|
|
|
Router.GET("/managementPlatform", A.ManagementPlatform)
|
|
|
Router.GET("/clusterInfo", A.ClusterInfo)
|
|
|
- Router.GET("/clusters", managerEndpoints.GetAllClusters)
|
|
|
+ Router.GET("/clusterInfoMap", A.GetClusterInfoMap)
|
|
|
Router.GET("/serviceAccountStatus", A.GetServiceAccountStatus)
|
|
|
+
|
|
|
+ // cluster manager endpoints
|
|
|
+ Router.GET("/clusters", managerEndpoints.GetAllClusters)
|
|
|
Router.PUT("/clusters", managerEndpoints.PutCluster)
|
|
|
Router.DELETE("/clusters/:id", managerEndpoints.DeleteCluster)
|
|
|
}
|